Author: michiel
Date: 2009-09-09 18:26:18 +0200 (Wed, 09 Sep 2009)
New Revision: 38532

Modified:
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
Log:
added some synchronization and logging

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
       2009-09-09 16:25:46 UTC (rev 38531)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
       2009-09-09 16:26:18 UTC (rev 38532)
@@ -265,80 +265,82 @@
      * @return job trans coding a source stream in (an)other stream(s)
      */
     private Job createJob(final Node node, final ChainedLogger logger) {
-        Job job = runningJobs.get(node.getNumber());
-        if (job != null) {
-            LOG.warn("This job is already running, node #" + node.getNumber());
-            // already running
-            return null;
-        }
-        final Job thisJob = new Job(node, logger);
-        runningJobs.put(node.getNumber(), thisJob);
+        synchronized(runningJobs) {
+            Job job = runningJobs.get(node.getNumber());
+            if (job != null) {
+                LOG.warn("This job is already running, node #" + 
node.getNumber());
+                // already running
+                return null;
+            }
+            final Job thisJob = new Job(node, logger);
+            runningJobs.put(node.getNumber(), thisJob);
 
-        thisJob.setFuture(transcoderExecutor.submit(new Callable<Integer>() {
-                    public Integer call() {
-                        thisJob.setThread(Thread.currentThread());
-                        int resultCount = 0;
-                        try {
-                            LOG.info("Executing " + thisJob);
-                            for (final Result result : thisJob) {
-                                LOG.info("NOW doing " + result);
-                                URI in  = result.getIn();
-                                URI out = result.getOut();
+            thisJob.setFuture(transcoderExecutor.submit(new 
Callable<Integer>() {
+                        public Integer call() {
+                            thisJob.setThread(Thread.currentThread());
+                            int resultCount = 0;
+                            try {
+                                LOG.info("Executing " + thisJob);
+                                for (final Result result : thisJob) {
+                                    LOG.info("NOW doing " + result);
+                                    URI in  = result.getIn();
+                                    URI out = result.getOut();
 
-                                JobDefinition jd = result.getJobDefinition();
-                                final List<AnalyzerLogger> analyzerLoggers = 
new ArrayList<AnalyzerLogger>();
-                                for (Analyzer a: jd.analyzers) {
-                                    AnalyzerLogger al = new 
AnalyzerLogger(a.clone(), thisJob.getNode(), result.getDestination());
-                                    analyzerLoggers.add(al);
-                                    logger.addLogger(al);
-                                }
-                                assert in != null;
+                                    JobDefinition jd = 
result.getJobDefinition();
+                                    final List<AnalyzerLogger> analyzerLoggers 
= new ArrayList<AnalyzerLogger>();
+                                    for (Analyzer a: jd.analyzers) {
+                                        AnalyzerLogger al = new 
AnalyzerLogger(a.clone(), thisJob.getNode(), result.getDestination());
+                                        analyzerLoggers.add(al);
+                                        logger.addLogger(al);
+                                    }
+                                    assert in != null;
 
-                                try {
-                                    jd.transcoder.transcode(in, out, logger);
-                                    for (AnalyzerLogger al : analyzerLoggers) {
-                                        
al.getAnalyzer().ready(thisJob.getNode(), result.getDestination());
-                                    }
-                                    resultCount++;
-                                    result.ready();
-                                    logger.info("RESULT " + thisJob + "(" + 
thisJob.getNode().getNodeManager().getName() + ":" + 
thisJob.getNode().getNumber() + "):" + result);
-                                    if (thisJob.isInterrupted() || 
Thread.currentThread().isInterrupted()){
+                                    try {
+                                        jd.transcoder.transcode(in, out, 
logger);
+                                        for (AnalyzerLogger al : 
analyzerLoggers) {
+                                            
al.getAnalyzer().ready(thisJob.getNode(), result.getDestination());
+                                        }
+                                        resultCount++;
+                                        result.ready();
+                                        logger.info("RESULT " + thisJob + "(" 
+ thisJob.getNode().getNodeManager().getName() + ":" + 
thisJob.getNode().getNumber() + "):" + result);
+                                        if (thisJob.isInterrupted() || 
Thread.currentThread().isInterrupted()){
+                                            logger.info("Interrupted");
+                                            break;
+                                        }
+                                    } catch (InterruptedException ie) {
+                                        thisJob.interrupt();
                                         logger.info("Interrupted");
                                         break;
+                                    } catch (Throwable e) {
+                                        logger.error(e.getMessage(), e);
+                                    } finally {
+                                        for (AnalyzerLogger al : 
analyzerLoggers) {
+                                            logger.removeLogger(al);
+                                        }
                                     }
-                                } catch (InterruptedException ie) {
-                                    thisJob.interrupt();
-                                    logger.info("Interrupted");
-                                    break;
-                                } catch (Throwable e) {
-                                    logger.error(e.getMessage(), e);
-                                } finally {
-                                    for (AnalyzerLogger al : analyzerLoggers) {
-                                        logger.removeLogger(al);
-                                    }
+                                    thisJob.findResults();
+
                                 }
-                                thisJob.findResults();
+                                if (! thisJob.isInterrupted()) {
+                                    logger.info("READY " + thisJob + "(" + 
thisJob.getNode().getNodeManager().getName() + ":" + 
thisJob.getNode().getNumber() + ")");
+                                    thisJob.getNode().commit();
+                                }
+                            } catch (RuntimeException e) {
+                                logger.error(e.getMessage(), e);
+                                throw e;
+                            } finally {
+                                logger.info("FINALLY " + resultCount);
+                                thisJob.ready(); // notify waiters
+                                
runningJobs.remove(thisJob.getNode().getNumber());
+                            }
+                            return resultCount;
 
-                            }
-                            if (! thisJob.isInterrupted()) {
-                                logger.info("READY " + thisJob + "(" + 
thisJob.getNode().getNodeManager().getName() + ":" + 
thisJob.getNode().getNumber() + ")");
-                                thisJob.getNode().commit();
-                            }
-                        } catch (RuntimeException e) {
-                            logger.error(e.getMessage(), e);
-                            throw e;
-                        } finally {
-                            logger.info("FINALLY " + resultCount);
-                            thisJob.ready(); // notify waiters
-                            runningJobs.remove(thisJob.getNode().getNumber());
                         }
-                        return resultCount;
-
-                    }
                     })
                 );
 
-        return thisJob;
+            return thisJob;
+        }
 
     }
 
@@ -392,7 +394,7 @@
         if (node.getNumber() > 0) {
 
             if (node.isChanged(field.getName())) {
-
+                LOG.service("For node " + node.getNumber() + ", the field '" + 
field.getName() + "' is changed " + node.getChanged() + ". That means that we 
mush schedule create caches");
                 final Cloud ntCloud = 
node.getCloud().getNonTransactionalCloud();
                 final int nodeNumber = node.getNumber();
                 ThreadPools.scheduler.schedule(new Runnable() {

_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs

Reply via email to