Author: michiel
Date: 2009-09-09 18:26:18 +0200 (Wed, 09 Sep 2009)
New Revision: 38532
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
Log:
added some synchronization and logging
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
2009-09-09 16:25:46 UTC (rev 38531)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
2009-09-09 16:26:18 UTC (rev 38532)
@@ -265,80 +265,82 @@
* @return job trans coding a source stream in (an)other stream(s)
*/
private Job createJob(final Node node, final ChainedLogger logger) {
- Job job = runningJobs.get(node.getNumber());
- if (job != null) {
- LOG.warn("This job is already running, node #" + node.getNumber());
- // already running
- return null;
- }
- final Job thisJob = new Job(node, logger);
- runningJobs.put(node.getNumber(), thisJob);
+ synchronized(runningJobs) {
+ Job job = runningJobs.get(node.getNumber());
+ if (job != null) {
+ LOG.warn("This job is already running, node #" +
node.getNumber());
+ // already running
+ return null;
+ }
+ final Job thisJob = new Job(node, logger);
+ runningJobs.put(node.getNumber(), thisJob);
- thisJob.setFuture(transcoderExecutor.submit(new Callable<Integer>() {
- public Integer call() {
- thisJob.setThread(Thread.currentThread());
- int resultCount = 0;
- try {
- LOG.info("Executing " + thisJob);
- for (final Result result : thisJob) {
- LOG.info("NOW doing " + result);
- URI in = result.getIn();
- URI out = result.getOut();
+ thisJob.setFuture(transcoderExecutor.submit(new
Callable<Integer>() {
+ public Integer call() {
+ thisJob.setThread(Thread.currentThread());
+ int resultCount = 0;
+ try {
+ LOG.info("Executing " + thisJob);
+ for (final Result result : thisJob) {
+ LOG.info("NOW doing " + result);
+ URI in = result.getIn();
+ URI out = result.getOut();
- JobDefinition jd = result.getJobDefinition();
- final List<AnalyzerLogger> analyzerLoggers =
new ArrayList<AnalyzerLogger>();
- for (Analyzer a: jd.analyzers) {
- AnalyzerLogger al = new
AnalyzerLogger(a.clone(), thisJob.getNode(), result.getDestination());
- analyzerLoggers.add(al);
- logger.addLogger(al);
- }
- assert in != null;
+ JobDefinition jd =
result.getJobDefinition();
+ final List<AnalyzerLogger> analyzerLoggers
= new ArrayList<AnalyzerLogger>();
+ for (Analyzer a: jd.analyzers) {
+ AnalyzerLogger al = new
AnalyzerLogger(a.clone(), thisJob.getNode(), result.getDestination());
+ analyzerLoggers.add(al);
+ logger.addLogger(al);
+ }
+ assert in != null;
- try {
- jd.transcoder.transcode(in, out, logger);
- for (AnalyzerLogger al : analyzerLoggers) {
-
al.getAnalyzer().ready(thisJob.getNode(), result.getDestination());
- }
- resultCount++;
- result.ready();
- logger.info("RESULT " + thisJob + "(" +
thisJob.getNode().getNodeManager().getName() + ":" +
thisJob.getNode().getNumber() + "):" + result);
- if (thisJob.isInterrupted() ||
Thread.currentThread().isInterrupted()){
+ try {
+ jd.transcoder.transcode(in, out,
logger);
+ for (AnalyzerLogger al :
analyzerLoggers) {
+
al.getAnalyzer().ready(thisJob.getNode(), result.getDestination());
+ }
+ resultCount++;
+ result.ready();
+ logger.info("RESULT " + thisJob + "("
+ thisJob.getNode().getNodeManager().getName() + ":" +
thisJob.getNode().getNumber() + "):" + result);
+ if (thisJob.isInterrupted() ||
Thread.currentThread().isInterrupted()){
+ logger.info("Interrupted");
+ break;
+ }
+ } catch (InterruptedException ie) {
+ thisJob.interrupt();
logger.info("Interrupted");
break;
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ } finally {
+ for (AnalyzerLogger al :
analyzerLoggers) {
+ logger.removeLogger(al);
+ }
}
- } catch (InterruptedException ie) {
- thisJob.interrupt();
- logger.info("Interrupted");
- break;
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- } finally {
- for (AnalyzerLogger al : analyzerLoggers) {
- logger.removeLogger(al);
- }
+ thisJob.findResults();
+
}
- thisJob.findResults();
+ if (! thisJob.isInterrupted()) {
+ logger.info("READY " + thisJob + "(" +
thisJob.getNode().getNodeManager().getName() + ":" +
thisJob.getNode().getNumber() + ")");
+ thisJob.getNode().commit();
+ }
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ throw e;
+ } finally {
+ logger.info("FINALLY " + resultCount);
+ thisJob.ready(); // notify waiters
+
runningJobs.remove(thisJob.getNode().getNumber());
+ }
+ return resultCount;
- }
- if (! thisJob.isInterrupted()) {
- logger.info("READY " + thisJob + "(" +
thisJob.getNode().getNodeManager().getName() + ":" +
thisJob.getNode().getNumber() + ")");
- thisJob.getNode().commit();
- }
- } catch (RuntimeException e) {
- logger.error(e.getMessage(), e);
- throw e;
- } finally {
- logger.info("FINALLY " + resultCount);
- thisJob.ready(); // notify waiters
- runningJobs.remove(thisJob.getNode().getNumber());
}
- return resultCount;
-
- }
})
);
- return thisJob;
+ return thisJob;
+ }
}
@@ -392,7 +394,7 @@
if (node.getNumber() > 0) {
if (node.isChanged(field.getName())) {
-
+ LOG.service("For node " + node.getNumber() + ", the field '" +
field.getName() + "' is changed " + node.getChanged() + ". That means that we
mush schedule create caches");
final Cloud ntCloud =
node.getCloud().getNonTransactionalCloud();
final int nodeNumber = node.getNumber();
ThreadPools.scheduler.schedule(new Runnable() {
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs