Author: michiel
Date: 2009-11-20 09:04:31 +0100 (Fri, 20 Nov 2009)
New Revision: 39813
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobDefinition.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/RecognizerResult.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/SkippedResult.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Stage.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/TranscoderResult.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilRecognizedFunction.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilTranscodingFunction.java
Log:
javadoc, svn:keywords
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -279,27 +279,36 @@
*/
public void submit(Cloud cloud, int n, ChainedLogger chain) {
JobCallable callable = new JobCallable(this, cloud, chain, n);
+ callable.init();
submit(callable);
}
+
/**
* Re-submit this job.
*/
+ void submit(final JobCallable jc) {
+ if (getStage() == Stage.READY) {
+ LOG.info("Will not submit, because we're ready" + jc);
+ } else {
+ LOG.info("Will submit " + jc);
+ ThreadPools.jobsExecutor.execute(new Runnable() {
+ public void run() {
+ synchronized(Job.this) {
+ findResults();
+ if (getCurrent() == null) {
+ iterator().next();
+ }
+ Stage s = getCurrent().getStage();
+ LOG.info("to " + s);
+ future = processor.threadPools.get(s).submit(jc);
+ Job.this.notifyAll();
+ }
+ }
+ });
+ }
+ }
- synchronized void submit(JobCallable jc) {
- LOG.info("Will submit " + jc);
- findResults();
- if (getCurrent() == null) {
- iterator().next();
- }
- Stage s = getCurrent().getStage();
- LOG.info("to " + s);
- if (getStage() == Stage.READY) {
- } else {
- future = processor.threadPools.get(s).submit(jc);
- notifyAll();
- }
- }
public Logger getLogger() {
return logger;
}
@@ -370,12 +379,12 @@
}
public synchronized void setThread(Thread t) {
- thread = t;
- notifyAll();
- if (t != null) {
- interrupted = t.isInterrupted();
- }
- }
+ thread = t;
+ notifyAll();
+ if (t != null) {
+ interrupted = t.isInterrupted();
+ }
+ }
/**
* The source Node on which this Job will run.
@@ -412,18 +421,17 @@
return getStage().ordinal() >= s.ordinal();
}
synchronized public void ready() {
- ready = true;
- notifyAll();
- }
+ ready = true;
+ notifyAll();
+ }
public synchronized void waitUntil(Stage stage)
throws InterruptedException {
- LOG.info("Waiting for " + stage);
- while (! reached(stage)) {
- wait();
- }
-
+ LOG.info("Waiting for " + stage);
+ while (! reached(stage)) {
+ wait();
}
+ }
public synchronized void waitUntilAfter(Stage stage) throws
InterruptedException {
while (getStage().ordinal() <= stage.ordinal()) {
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
___________________________________________________________________
Name: svn:keywords
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -22,6 +22,9 @@
/**
* This is the actual callable that can be submitted to the Executors.
* It can actually be submitted multiple times. Until it is ready.
+ *
+ * This boils down to iterating the {...@link Job}.
+ * @author Michiel Meeuwissen
*/
class JobCallable implements Callable<Integer> {
private static final Logger LOG =
Logging.getLoggerInstance(JobCallable.class);
@@ -37,17 +40,23 @@
this.ntCloud = cloud;
this.logger = l;
this.node = node;
- init();
+ assert node > 0;
}
- protected void init() {
+
+ Job getJob() {
+ return thisJob;
+ }
+
+ protected synchronized void init() {
if (ntNode == null) {
thisJob.setThread(Thread.currentThread());
if (ntCloud instanceof
org.mmbase.bridge.implementation.BasicCloud) {
try {
synchronized(ntCloud) {
while (! ntCloud.hasNode(node)) {
- ntCloud.wait(200);
+ ntCloud.wait(1000);
+ LOG.info("Still no node " + node + "");
}
}
} catch (InterruptedException ie) {
@@ -59,14 +68,24 @@
ntNode.getStringValue("title"); // This triggers
RelatedField$Creator to create a mediafragment
Node mediafragment = ntNode.getNodeValue("mediafragment");
thisJob.setNode(ntNode);
+ notifyAll();
}
if (iterator == null) {
iterator = thisJob.iterator();
}
}
+ void waitForNode() throws InterruptedException {
+ synchronized(this) {
+ while(thisJob.getNode() == null) {
+ wait();
+ }
+ }
+ }
+
+
public Integer call() {
-
+ init();
int resultCount = 0;
try {
Result result = thisJob.getCurrent();
@@ -89,7 +108,10 @@
}
if (result != null && result.getStage() != current.getStage())
{
LOG.info("Will do next stage " + current.getStage() + "
now (was " + result + "), first returning");
- thisJob.submit(this);
+ try {
+ thisJob.submit(this);
+ } catch (Exception e) {
+ }
return resultCount;
}
result = current;
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
___________________________________________________________________
Name: svn:keywords
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobDefinition.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobDefinition.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobDefinition.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -18,8 +18,9 @@
/**
- * The description or definition of one 'transcoding' sub job that's doing the
transcoding. This
+ * The description or definition of one 'transcoding' sub jobs that's doing
the transcoding. This
* combines a transcoder, with a mime type for which it must be valid, and a
list of analyzers.
+ * @author Michiel Meeuwissen
*/
class JobDefinition implements Serializable {
private static final long serialVersionUID = 0L;
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobDefinition.java
___________________________________________________________________
Name: svn:keywords
+ Id
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/RecognizerResult.java
___________________________________________________________________
Name: svn:keywords
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -16,6 +16,11 @@
+/**
+ * When executing an actual {...@link JobDefinition} the result is contain in
an object like this.
+ * @author Michiel Meeuwissen
+ * @version $Id$
+ */
public abstract class Result {
final JobDefinition definition;
final URI in;
@@ -30,15 +35,29 @@
return definition;
}
+ /**
+ * The Node receiving the result or <code>null</code> if that is not
applicable (for recognizers).
+ */
public abstract Node getDestination();
//public abstract Node getNode();
+ /**
+ * The file receiving the result, or <code>null</code> if that is not
applicable
+ */
public abstract URI getOut();
+ /**
+ * The file containing the input for the job.
+ */
public URI getIn() {
return in;
}
+
+ /**
+ * Marks this result as ready, meaning that there is nothing left to be
done and {...@link #isReady} will return true from now on.
+ * Also, extensions may override this with extra functionality which can
only be done if transcoding is ready.
+ */
public void ready() {
ready = true;
@@ -46,8 +65,15 @@
public boolean isReady() {
return ready;
}
+
+ /**
+ * On what kind of inputs this result can work.
+ */
public abstract MimeType getMimeType();
+ /**
+ * To what {...@link Stage} of the transcoding process this result
belongs. Either {...@link Stage.RECOGNIZER} or {...@link Stage.TRANSCODER}.
+ */
public final Stage getStage() {
return getJobDefinition().getStage();
}
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
___________________________________________________________________
Name: svn:keywords
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/SkippedResult.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/SkippedResult.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/SkippedResult.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -14,6 +14,10 @@
import java.net.*;
+/**
+ * This is a place holder for the result of a transcoder which is not to be
done, because production
+ * of its source was skipped already, or because the source does not match the
mime type.
+ */
class SkippedResult extends Result {
SkippedResult(JobDefinition def, URI in) {
super(def, in);
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/SkippedResult.java
___________________________________________________________________
Name: svn:keywords
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Stage.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Stage.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Stage.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -13,6 +13,8 @@
/**
* This enum can contain the 'stage' of the transcoding process
+ * @author Michiel Meeuwissen
+ * @version $Id$
*/
public enum Stage {
/**
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Stage.java
___________________________________________________________________
Name: svn:keywords
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/TranscoderResult.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/TranscoderResult.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/TranscoderResult.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -21,7 +21,10 @@
/**
- * Container for the result of a JobDefinition
+ * Container for the result of a JobDefinition This is the result of an actual
transcoding. This means that it does have a 'destination' node {...@link
#getDestination()} and URI {...@link #getOut()}.
+ *
+ * @author Michiel Meeuwissen
+ * @version $Id$
*/
class TranscoderResult extends Result {
private static final Logger LOG =
Logging.getLoggerInstance(TranscoderResult.class);
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/TranscoderResult.java
___________________________________________________________________
Name: svn:keywords
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilRecognizedFunction.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilRecognizedFunction.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilRecognizedFunction.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -16,7 +16,7 @@
/**
*
* @author Andr� van Toly
- * @version $Id: CreateCachesFunction.java 36715 2009-07-08 22:30:03Z michiel $
+ * @version $Id$
*/
public class WaitUntilRecognizedFunction extends NodeFunction<Boolean> {
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilRecognizedFunction.java
___________________________________________________________________
Name: svn:keywords
- If
+ Id
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilTranscodingFunction.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilTranscodingFunction.java
2009-11-20 01:20:21 UTC (rev 39812)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilTranscodingFunction.java
2009-11-20 08:04:31 UTC (rev 39813)
@@ -16,7 +16,7 @@
/**
*
* @author Michiel Meeuwissen
- * @version $Id: CreateCachesFunction.java 36715 2009-07-08 22:30:03Z michiel $
+ * @version $Id$
*/
public class WaitUntilTranscodingFunction extends NodeFunction<Boolean> {
Property changes on:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/WaitUntilTranscodingFunction.java
___________________________________________________________________
Name: svn:keywords
- If
+ Id
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs