Author: michiel
Date: 2009-11-12 18:18:02 +0100 (Thu, 12 Nov 2009)
New Revision: 39676
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
Log:
MMB-1873
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
2009-11-12 17:18:02 UTC (rev 39676)
@@ -87,20 +87,36 @@
}
private static int transSeq = 0;
- public final ThreadPoolExecutor transcoderExecutor = new
ThreadPoolExecutor(3, 3, 5 * 60 , TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+
+ public final Map<Stage, ThreadPoolExecutor> threadPools = new
EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
+ private final List<CommandExecutor.Method> executors = new
CopyOnWriteArrayList<CommandExecutor.Method>();
+ {
+ threadPools.put(Stage.TRANSCODER, new ThreadPoolExecutor(3, 3, 5 * 60
, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return ThreadPools.newThread(r, "TranscoderThread-" +
Stage.TRANSCODER + "-" + (transSeq++));
+ }
+ }));
+ threadPools.put(Stage.RECOGNIZER, new ThreadPoolExecutor(3, 3, 5 * 60
, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable r) {
- return ThreadPools.newThread(r, "TranscoderThread-" +
(transSeq++));
+ return ThreadPools.newThread(r, "TranscoderThread-" +
Stage.RECOGNIZER + "-" + (transSeq++));
}
- });
- {
- ThreadPools.getThreadPools().put(CreateCachesProcessor.class.getName()
+ ".transcoders", transcoderExecutor);
- }
+ }));
+ // fill the complete map, so we don't have to think about it any more
later on.
+ for (Stage s : Stage.values()) {
+ if (s.ordinal() < Stage.TRANSCODER.ordinal()) {
+ threadPools.put(s, threadPools.get(Stage.TRANSCODER));
+ }
+ if (s.ordinal() > Stage.RECOGNIZER.ordinal()) {
+ threadPools.put(s, threadPools.get(Stage.RECOGNIZER));
+ }
+ }
+ for (Map.Entry<Stage, ThreadPoolExecutor> e : threadPools.entrySet()) {
+
ThreadPools.getThreadPools().put(CreateCachesProcessor.class.getName() + "." +
e.getKey().toString(), e.getValue());
+ }
- private final List<CommandExecutor.Method> executors = new
CopyOnWriteArrayList<CommandExecutor.Method>();
- {
- executors.add(new CommandExecutor.Method());
- executors.add(new CommandExecutor.Method());
- executors.add(new CommandExecutor.Method());
+ for (int i = 0; i < 6; i++) {
+ executors.add(new CommandExecutor.Method());
+ }
}
private String configFile = "streams/createcaches.xml";
@@ -112,7 +128,7 @@
Map<String, JobDefinition> newList = new
LinkedHashMap<String, JobDefinition>();
List<CommandExecutor.Method> newExecutors = new
ArrayList<CommandExecutor.Method>();
Document document =
getResourceLoader().getDocument(resource);
- int totalTranscoders = 0;
+ Map<Stage, Integer> totals = new EnumMap<Stage,
Integer>(Stage.class);
if (document != null) {
org.w3c.dom.NodeList ellist =
document.getDocumentElement().getChildNodes();
@@ -159,26 +175,36 @@
LOG.warn("" + newList + " already
contains an entry with id " + id);
}
newList.put(id, def);
-
+
} else if
(el.getTagName().equals("localhost")) {
int max =
Integer.parseInt(el.getAttribute("max_simultaneous_transcoders"));
- totalTranscoders += max;
+ Stage s =
Stage.valueOf(el.getAttribute("stage").toUpperCase());
+ Integer t = totals.get(s);
+ if (t == null) t = 0;
+ t += max;
+ totals.put(s, t);
for (int j = 1; j <= max; j++) {
newExecutors.add(new
CommandExecutor.Method());
}
} else if (el.getTagName().equals("server")) {
int max =
Integer.parseInt(el.getAttribute("max_simultaneous_transcoders"));
- totalTranscoders += max;
- String host = el.getAttribute("host");
- int port =
Integer.parseInt(el.getAttribute("port"));
- for (int j = 1; j <= max; j++) {
- newExecutors.add(new
CommandExecutor.Method(host, port));
- }
+ Stage s =
Stage.valueOf(el.getAttribute("stage").toUpperCase());
+ Integer t = totals.get(s);
+ if (t == null) t = 0;
+ t += max;
+ totals.put(s, t);
+ String host = el.getAttribute("host");
+ int port =
Integer.parseInt(el.getAttribute("port"));
+ for (int j = 1; j <= max; j++) {
+ newExecutors.add(new
CommandExecutor.Method(host, port));
+ }
}
}
}
- transcoderExecutor.setCorePoolSize(totalTranscoders);
-
transcoderExecutor.setMaximumPoolSize(totalTranscoders);
+ for (Map.Entry<Stage, Integer> e : totals.entrySet()) {
+
threadPools.get(e.getKey()).setCorePoolSize(e.getValue());
+
threadPools.get(e.getKey()).setMaximumPoolSize(e.getValue());
+ }
} else {
LOG.warn("No " + resource);
}
@@ -188,7 +214,7 @@
executors.clear();
executors.addAll(newExecutors);
}
- LOG.service("Reading of configuration file " + resource +
" successfull. Transcoders now " + list + ". Executors " + executors + ". Max
simultaneous transcoders: " + totalTranscoders);
+ LOG.service("Reading of configuration file " + resource +
" successfull. Transcoders now " + list + ". Executors " + executors + ". Max
simultaneous transcoders: " + totals);
} catch (Exception e) {
LOG.error(e.getClass() + " " + e.getMessage() + " In " +
resource + " Transcoders now " + list + " (not changed)", e);
}
@@ -282,7 +308,7 @@
final Job thisJob = new Job(ntCloud, logger);
runningJobs.put(node, thisJob);
- thisJob.setFuture(transcoderExecutor.submit(new
JobCallable(thisJob, ntCloud, logger, node)));
+ thisJob.submit(ntCloud, node, logger);
return thisJob;
}
@@ -291,7 +317,7 @@
/**
- * Creates caches nodes when not existing by creating a transcoding Job
+ * Creates caches nodes when not existing by creating a transcoding Job
* @param ntCloud a non transactional cloud
* @param int node number
* @return Job recognizing and/or transcoding the source stream
@@ -334,7 +360,7 @@
if (node.getNumber() > 0) {
if (node.isChanged(field.getName())) {
LOG.service("For node " + node.getNumber() + ", the field '" +
field.getName() + "' is changed " + node.getChanged() + ". That means that we
must schedule create caches");
-
+
final Cloud ntCloud =
node.getCloud().getNonTransactionalCloud();
final int nodeNumber = node.getNumber();
createCaches(ntCloud, nodeNumber);
@@ -375,7 +401,7 @@
final String label;
final Stage stage;
-
+
/**
* Creates an JobDefinition template (used in the configuration
container).
*/
@@ -453,6 +479,10 @@
}
public abstract MimeType getMimeType();
+ public final Stage getStage() {
+ return getJobDefinition().getStage();
+ }
+
}
/**
@@ -483,7 +513,7 @@
public void ready() {
super.ready();
if (dest != null) {
- LOG.info("Setting " + dest.getNumber() + " to done");
+ LOG.service("Setting " + dest.getNumber() + " to done");
File outFile = new File(getDirectory(),
dest.getStringValue("url").replace("/", File.separator));
dest.setLongValue("filesize", outFile.length());
if (outFile.length() > 1) { // @TODO: there should maybe
be other ways to detect if a transcoding failed
@@ -514,9 +544,9 @@
}
-
+
/**
- * Result of a recognizer JobDefinition, just recognizes the type of
stream etc.
+ * Result of a recognizer JobDefinition, just recognizes the type of
stream etc.
* Does not transcode. The result out is the same as in, same for mimetype.
*/
public class RecognizerResult extends Result {
@@ -600,11 +630,14 @@
logger.setMaxSize(100);
logger.setMaxAge(60000);
chain.addLogger(logger);
+ for (Map.Entry<String, JobDefinition> dum :
CreateCachesProcessor.this.list.entrySet()) {
+ results.add(null);
+ }
}
/**
- * Defines the several Results by reading the JobDefinitions in the
list.
- * Creates streamsourcescaches for transcoders and asigns
TranscoderResults to them or creates
+ * Defines the several Results by reading the JobDefinitions in the
list.
+ * Creates streamsourcescaches for transcoders and asigns
TranscoderResults to them or creates
* RecognizerResults for JobDefinitions of recognizers.
*/
protected void findResults() {
@@ -615,7 +648,7 @@
JobDefinition jd = entry.getValue();
URI inURI;
Node inNode;
-
+
// inNode (input stream) to be used
if (jd.getInId() == null) {
String url = node.getStringValue("url");
@@ -698,7 +731,6 @@
dest.commit();
- System.out.println("Created " + dest);
String destFile = dest.getStringValue("url");
assert destFile != null;
assert destFile.length() > 0;
@@ -724,7 +756,7 @@
URI outURI = outFile.toURI();
Result result = new TranscoderResult(jd, dest, inURI,
outURI);
-
+
LOG.info("Added result to results list with key: " +
dest.getStringValue("key"));
results.set(i, result);
lookup.put(jd.getId(), result);
@@ -743,10 +775,11 @@
public Iterator<Result> iterator() {
return new Iterator<Result>() {
- int i = -1;
+ int i = 0;
public boolean hasNext() {
- for (int j = i + 1 ; j < results.size(); j++) {
+ for (int j = i ; j < results.size(); j++) {
if (results.get(j) != null && !
results.get(j).isReady()) {
+ LOG.debug("Found to do at " + j + " -> " +
results.get(j));
return true;
}
}
@@ -757,12 +790,16 @@
}
public Result next() {
synchronized(Job.this) {
- while(current == null || current.isReady()) {
- i++;
- current = results.get(i);
- Job.this.notifyAll();
+ for (; i < results.size(); i++) {
+ if (results.get(i) != null && !
results.get(i).isReady()) {
+ current = results.get(i);
+ Job.this.notifyAll();
+ i++;
+ break;
+ }
}
}
+
if (current.definition.transcoder instanceof
CommandTranscoder) {
// Get free method
CommandExecutor.Method m = null;
@@ -784,7 +821,7 @@
Node destination = current.getDestination();
if (destination != null) {
try {
- LOG.info("Setting " +
destination.getNodeManager().getName() + " " + destination.getNumber() + " to
BUSY");
+ LOG.debug("Setting " +
destination.getNodeManager().getName() + " " + destination.getNumber() + " to
BUSY " + i);
destination.setIntValue("state",
State.BUSY.getValue());
destination.commit();
} catch (Exception e) {
@@ -792,15 +829,40 @@
}
}
busy++;
+ LOG.debug(" Returning at " + i + " " + current + " ("
+current.isReady());
return current;
}
};
}
- public void setFuture(Future<Integer> f) {
- future = f;
+
+ /**
+ * Start actually executing this Jobs.
+ */
+ public void submit(Cloud cloud, int n, ChainedLogger chain) {
+ JobCallable callable = new JobCallable(this, cloud, chain, n);
+ submit(callable);
}
+
+ /**
+ * Re-submit this job.
+ */
+
+ public synchronized void submit(JobCallable jc) {
+ LOG.info("Will submit " + jc);
+ findResults();
+ if (getCurrent() == null) {
+ iterator().next();
+ }
+ Stage s = getCurrent().getStage();
+ LOG.info("to " + s);
+ if (getStage() == Stage.READY) {
+ } else {
+ future =
CreateCachesProcessor.this.threadPools.get(s).submit(jc);
+ notifyAll();
+ }
+ }
public Logger getLogger() {
return logger;
}
@@ -888,12 +950,10 @@
mediaprovider = node.getNodeValue("mediaprovider");
assert mediafragment != null : "Mediafragment should not be null";
//assert mediaprovider != null : "Mediaprovider should not be
null";
- for (Map.Entry<String, JobDefinition> dum :
CreateCachesProcessor.this.list.entrySet()) {
- results.add(null);
- }
findResults();
}
public synchronized void interrupt() {
+
Node cacheNode = current.getDestination();
if (cacheNode != null &&
node.getCloud().hasNode(cacheNode.getNumber())) {
cacheNode.setIntValue("state", State.INTERRUPTED.getValue());
@@ -910,47 +970,37 @@
public boolean isInterrupted() {
return interrupted;
}
- public boolean isReady() {
- return ready;
+ public boolean reached(Stage s) {
+ LOG.info("Comparing for " + getStage() + ">=" + s);
+ return getStage().ordinal() >= s.ordinal();
}
- public synchronized void ready() {
- ready = true;
- notifyAll();
- }
+ synchronized public void ready() {
+ ready = true;
+ notifyAll();
+ }
- public synchronized void waitUntilReady() throws InterruptedException {
- while (! isReady()) {
- wait();
+ public synchronized void waitUntil(Stage stage)
+ throws InterruptedException {
+ LOG.info("Waiting for " + stage);
+ while (! reached(stage)) {
+ wait();
+ }
+
}
- }
+
+ public synchronized void waitUntilAfter(Stage stage) throws
InterruptedException {
+ while (getStage().ordinal() <= stage.ordinal()) {
+ wait();
+ }
+ }
+
public Stage getStage() {
if (ready) return Stage.READY;
Result res = getCurrent();
return res == null ? Stage.UNSTARTED :
res.getJobDefinition().getStage();
}
- public void waitUntilRecognized() throws InterruptedException {
- Stage stage = getStage();
- while (stage.ordinal() < Stage.RECOGNIZER.ordinal()) {
- synchronized(this) {
- LOG.info("Not yet recognized, but " + stage + " " +
getCurrent());
- wait(100000);
- }
- stage = getStage();
- }
- }
- public void waitUntilTranscoding() throws InterruptedException {
- Stage stage = getStage();
- while (stage.ordinal() < Stage.TRANSCODER.ordinal()) {
- synchronized(this) {
- LOG.info("Not yet transcoding, but " + stage + " " +
getCurrent());
- wait(100000);
- }
- stage = getStage();
- }
- }
-
public String getProgress() {
return "" + busy + "/" + results.size();
}
@@ -980,14 +1030,33 @@
}
}
-
+ /**
+ * This enum can contain the 'stage' of the transcoding process
+ */
public static enum Stage {
+ /**
+ * Nothing happend yet.
+ */
UNSTARTED,
+ /**
+ * Currently busy with a recognizer
+ */
RECOGNIZER,
+ /**
+ * Currently busy with a transcoder
+ */
TRANSCODER,
+ /**
+ * Everything's done!
+ */
READY;
}
+
+ /**
+ * This is the actual callable that can be submitted to the Executors.
+ * It can actually be submitted multiple times. Until it is ready.
+ */
protected class JobCallable implements Callable<Integer> {
private final Job thisJob;
private final Cloud ntCloud;
@@ -1001,6 +1070,7 @@
this.ntCloud = cloud;
this.logger = l;
this.node = node;
+ init();
}
protected void init() {
@@ -1029,13 +1099,33 @@
}
public Integer call() {
- init();
int resultCount = 0;
try {
- LOG.info("Executing " + thisJob);
- while (iterator.hasNext()) {
- final Result result = iterator.next();
+ Result result = thisJob.getCurrent();
+ while (true) {
+ LOG.info("Executing " + thisJob);
+ Result current = thisJob.getCurrent();
+ if (current == null || current.isReady()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ }
+ current = thisJob.getCurrent();
+
+ LOG.info("Found " + current);
+ if (current.isReady()) {
+ thisJob.ready();
+ return resultCount;
+
+ }
+
+ }
+ if (result != null && result.getStage() !=
current.getStage()) {
+ LOG.info("Will do next stage " + current.getStage() +
" now (was " + result + "), first returning");
+ thisJob.submit(this);
+ return resultCount;
+ }
+ result = current;
LOG.info("NOW doing " + result);
URI in = result.getIn();
URI out = result.getOut();
@@ -1085,8 +1175,8 @@
throw e;
} finally {
logger.info("FINALLY " + resultCount);
- thisJob.ready(); // notify waiters
- runningJobs.remove(thisJob.getNode().getNumber());
+ //thisJob.ready(); // notify waiters
+ //runningJobs.remove(thisJob.getNode().getNumber());
}
return resultCount;
}
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
2009-11-12 17:18:02 UTC (rev 39676)
@@ -18,7 +18,7 @@
/**
*
- * @author André van Toly
+ * @author Andr� van Toly
* @version $Id: CreateCachesFunction.java 36715 2009-07-08 22:30:03Z michiel $
*/
@@ -35,7 +35,7 @@
CreateCachesProcessor.Job job = CreateCachesProcessor.getJob(node);
if (job != null) {
try {
- job.waitUntilRecognized();
+ job.waitUntilAfter(CreateCachesProcessor.Stage.RECOGNIZER);
} catch (InterruptedException ie) {
return false;
}
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
2009-11-12 17:18:02 UTC (rev 39676)
@@ -35,7 +35,7 @@
CreateCachesProcessor.Job job = CreateCachesProcessor.getJob(node);
if (job != null) {
try {
- job.waitUntilTranscoding();
+ job.waitUntil(CreateCachesProcessor.Stage.TRANSCODER);
} catch (InterruptedException ie) {
return false;
}
Modified:
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
===================================================================
---
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
2009-11-12 17:18:02 UTC (rev 39676)
@@ -54,12 +54,14 @@
<xsd:element name="localhost">
<xsd:complexType>
<xsd:attribute name="max_simultaneous_transcoders" type="xsd:integer"
use="required" />
+ <xsd:attribute name="stage" type="type_stage"
use="required" />
</xsd:complexType>
</xsd:element>
<xsd:element name="server">
<xsd:complexType>
<xsd:attribute name="max_simultaneous_transcoders" type="xsd:integer"
use="required" />
+ <xsd:attribute name="stage" type="type_stage"
use="required" />
<xsd:attribute name="host" type="xsd:string" use="required" />
<xsd:attribute name="port" type="xsd:integer" use="required" />
</xsd:complexType>
@@ -93,4 +95,11 @@
</xsd:complexType>
</xsd:element>
+ <xsd:simpleType name="type_stage">
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="recognizer" />
+ <xsd:enumeration value="transcoder" />
+ </xsd:restriction>
+ </xsd:simpleType>
+
</xsd:schema>
Modified:
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
===================================================================
---
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
2009-11-12 17:18:02 UTC (rev 39676)
@@ -106,7 +106,7 @@
- //@Test
+ @Test
public void node() {
Cloud cloud = getCloud();
assumeNotNull(cloud);
@@ -191,7 +191,7 @@
CreateCachesProcessor proc = get("crazycreatecaches.xml");
Node source = getNode(proc.getDirectory());
CreateCachesProcessor.Job job =
proc.createCaches(source.getCloud(), source.getNumber());
- job.waitUntilReady();
+ job.waitUntil(CreateCachesProcessor.Stage.READY);
assertTrue("No node " + source.getNumber() + " in " +
source.getCloud(), source.getCloud().hasNode(source.getNumber()));
source = refresh(source);
}
@@ -204,7 +204,7 @@
Node source = getNode(proc.getDirectory());
CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(),
source.getNumber());
- job.waitUntilReady();
+ job.waitUntil(CreateCachesProcessor.Stage.READY);
assertTrue("No node " + source.getNumber() + " in " +
source.getCloud(), source.getCloud().hasNode(source.getNumber()));
source = refresh(source);
@@ -217,7 +217,7 @@
Node source = getNode(proc.getDirectory());
CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(),
source.getNumber());
source.commit();
- job.waitUntilReady();
+ job.waitUntil(CreateCachesProcessor.Stage.READY);
assertTrue(source.getCloud().hasNode(source.getNumber()));
source = refresh(source);
checkSource(source, 2);
@@ -230,7 +230,7 @@
Node source = getNode(proc.getDirectory());
CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(),
source.getNumber());
source.commit();
- job.waitUntilReady();
+ job.waitUntil(CreateCachesProcessor.Stage.READY);
source = refresh(source);
checkSource(source, 3);
}
@@ -241,7 +241,7 @@
Node source = getNode(proc.getDirectory());
CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(),
source.getNumber());
source.commit();
- job.waitUntilReady();
+ job.waitUntil(CreateCachesProcessor.Stage.READY);
source = refresh(source);
checkSource(source, 4);
}
@@ -252,7 +252,7 @@
Node source = getNode(proc.getDirectory());
CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(),
source.getNumber());
source.commit();
- job.waitUntilReady();
+ job.waitUntil(CreateCachesProcessor.Stage.READY);
source = refresh(source);
checkSource(source, 1);
}
@@ -263,7 +263,7 @@
Node source = getNode(proc.getDirectory());
CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(),
source.getNumber());
source.commit();
- job.waitUntilReady();
+ job.waitUntil(CreateCachesProcessor.Stage.READY);
source = refresh(source);
checkSource(source, 4);
}
Modified:
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
===================================================================
---
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches
http://www.mmbase.org/xmlns/createcaches.xsd"
>
- <localhost max_simultaneous_transcoders="1" />
+ <localhost max_simultaneous_transcoders="1" stage="recognizer"/>
<!-- No caches at all, recognizing only -->
Modified:
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
===================================================================
---
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches
http://www.mmbase.org/xmlns/createcaches.xsd"
>
- <localhost max_simultaneous_transcoders="1" />
+ <localhost max_simultaneous_transcoders="1" stage="recognizer" />
<!-- No caches at all, recognizing only -->
Modified:
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
===================================================================
---
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches
http://www.mmbase.org/xmlns/createcaches.xsd"
>
- <localhost max_simultaneous_transcoders="1" />
+ <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+ <localhost max_simultaneous_transcoders="1" stage="transcoder" />
<recognizer id="recognizer">
Modified:
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
===================================================================
---
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches
http://www.mmbase.org/xmlns/createcaches.xsd"
>
- <localhost max_simultaneous_transcoders="1" />
+ <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+ <localhost max_simultaneous_transcoders="1" stage="transcoder" />
<recognizer id="recognizer">
Modified:
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
===================================================================
---
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,9 +3,11 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches
http://www.mmbase.org/xmlns/createcaches.xsd"
>
- <localhost max_simultaneous_transcoders="1" />
+ <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+ <localhost max_simultaneous_transcoders="1" stage="transcoder" />
+
<recognizer id="recognizer">
<class name="org.mmbase.streams.transcoders.FFMpegRecognizer">
</class>
Modified:
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
===================================================================
---
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches
http://www.mmbase.org/xmlns/createcaches.xsd"
>
- <localhost max_simultaneous_transcoders="1" />
+ <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+ <localhost max_simultaneous_transcoders="1" stage="transcoder" />
<recognizer id="recognizer">
Modified:
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
===================================================================
---
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
2009-11-12 16:36:50 UTC (rev 39675)
+++
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,9 +3,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches
http://www.mmbase.org/xmlns/createcaches.xsd"
>
- <localhost max_simultaneous_transcoders="1" />
+ <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+ <localhost max_simultaneous_transcoders="1" stage="transcoder" />
-
<recognizer id="recognizer">
<class name="org.mmbase.streams.transcoders.FFMpegRecognizer">
</class>
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs