Author: andre
Date: 2009-11-23 13:57:47 +0100 (Mon, 23 Nov 2009)
New Revision: 39843

Modified:
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
Log:
checking for completed jobs, work in progress

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
    2009-11-23 10:41:32 UTC (rev 39842)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
    2009-11-23 12:57:47 UTC (rev 39843)
@@ -49,6 +49,7 @@
     private final List<Result>        results = new ArrayList<Result>();
     private final long number = lastJobNumber++;
     private int busy = 0;
+    private int skipped = 0;
 
     Future<Integer> future;
 
@@ -120,6 +121,7 @@
                 if (! jd.getMimeType().matches(new 
MimeType(inNode.getStringValue("mimetype")))) {
                     LOG.info("SKIPPING " + jd);
                     results.set(i, new SkippedResult(jd, inURI));
+                    skipped++;
                     continue;
                 } else {
                     LOG.info("NOT SKIPPING " + jd);
@@ -417,9 +419,20 @@
         LOG.info("Comparing for " + getStage() + ">=" + s);
         return getStage().ordinal() >= s.ordinal();
     }
+    
     synchronized public void ready() {
-        ready = true;
+        if (isInterrupted()) {
+            ready = true;
+        }
         notifyAll();
+
+        if (future.isDone()) {
+            processor.runningJobs.remove(getNode().getNumber());
+            ready = true;
+        } else {
+            LOG.warn("This job has not completed yet.");
+        }
+        ready = true;   // BUG: ?! not correct
     }
 
     public synchronized void waitUntil(Stage stage)
@@ -444,7 +457,8 @@
     }
 
     public String getProgress() {
-        return "" + busy + "/" + results.size();
+        int done = busy + skipped;
+        return "" + done + "/" + results.size();
     }
     public int getBusy() {
         return busy;

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
    2009-11-23 10:41:32 UTC (rev 39842)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
    2009-11-23 12:57:47 UTC (rev 39843)
@@ -175,7 +175,7 @@
             throw e;
         } finally {
             logger.info("FINALLY " + resultCount);
-            //thisJob.ready(); // notify waiters
+            thisJob.ready(); // notify waiters
             //runningJobs.remove(thisJob.getNode().getNumber());
         }
         logger.info("3: returning resultCount: " + resultCount);

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
      2009-11-23 10:41:32 UTC (rev 39842)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
      2009-11-23 12:57:47 UTC (rev 39843)
@@ -237,7 +237,7 @@
         initWatcher();
     }
 
-    private static final Map<Integer, Job> runningJobs = 
Collections.synchronizedMap(new LinkedHashMap<Integer, Job>());
+    protected static final Map<Integer, Job> runningJobs = 
Collections.synchronizedMap(new LinkedHashMap<Integer, Job>());
 
     /* Jobs user with this context has started */
     public static Set<Job> myJobs(UserContext u) {
@@ -258,6 +258,17 @@
     public static Job getJob(Node node) {
         return runningJobs.get(node.getNumber());
     }
+    
+    protected static boolean removeJob(Node node) {
+        Job job = runningJobs.get(node.getNumber());
+        boolean done = job.future.isDone();
+        if (done) {
+            runningJobs.remove(node.getNumber());
+        } else {
+            job.logger.info("This job has not completed yet.");
+        }
+        return done;
+    }
 
     public static String cancelJob(Node node) {
         if (node.getCloud().may(ActionRepository.getInstance().get("streams", 
"cancel_jobs"), null)) {
@@ -266,12 +277,18 @@
                 return "No job for node #" + node.getNumber();
             } else {
                 job.interrupt();
+                String msg = "";
+                if (job.future.isDone()) {
+                    msg = "This job has already completed. ";
+                    job.logger.info(msg);
+                }
                 if (job.future.cancel(true)) {
-                    String message = "Canceled job for node #" + 
node.getNumber() + " (" + job.future + ")";
+                    String message = msg + "Canceled job for node #" + 
node.getNumber() + " (" + job.future + ")";
                     job.logger.info(message);
+                    removeJob(node);
                     return message;
                 } else {
-                    return "Could not cancel " + job;
+                    return msg + "Could not cancel " + job;
                 }
             }
         } else {

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
 2009-11-23 10:41:32 UTC (rev 39842)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
 2009-11-23 12:57:47 UTC (rev 39843)
@@ -60,7 +60,6 @@
      */
     public void ready() {
         ready = true;
-
     }
     public boolean isReady() {
         return ready;

_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs

Reply via email to