Author: andre
Date: 2009-11-23 13:57:47 +0100 (Mon, 23 Nov 2009)
New Revision: 39843
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
Log:
checking for completed jobs, work in progress
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
2009-11-23 10:41:32 UTC (rev 39842)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Job.java
2009-11-23 12:57:47 UTC (rev 39843)
@@ -49,6 +49,7 @@
private final List<Result> results = new ArrayList<Result>();
private final long number = lastJobNumber++;
private int busy = 0;
+ private int skipped = 0;
Future<Integer> future;
@@ -120,6 +121,7 @@
if (! jd.getMimeType().matches(new
MimeType(inNode.getStringValue("mimetype")))) {
LOG.info("SKIPPING " + jd);
results.set(i, new SkippedResult(jd, inURI));
+ skipped++;
continue;
} else {
LOG.info("NOT SKIPPING " + jd);
@@ -417,9 +419,20 @@
LOG.info("Comparing for " + getStage() + ">=" + s);
return getStage().ordinal() >= s.ordinal();
}
+
synchronized public void ready() {
- ready = true;
+ if (isInterrupted()) {
+ ready = true;
+ }
notifyAll();
+
+ if (future.isDone()) {
+ processor.runningJobs.remove(getNode().getNumber());
+ ready = true;
+ } else {
+ LOG.warn("This job has not completed yet.");
+ }
+ ready = true; // BUG: ?! not correct
}
public synchronized void waitUntil(Stage stage)
@@ -444,7 +457,8 @@
}
public String getProgress() {
- return "" + busy + "/" + results.size();
+ int done = busy + skipped;
+ return "" + done + "/" + results.size();
}
public int getBusy() {
return busy;
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
2009-11-23 10:41:32 UTC (rev 39842)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/JobCallable.java
2009-11-23 12:57:47 UTC (rev 39843)
@@ -175,7 +175,7 @@
throw e;
} finally {
logger.info("FINALLY " + resultCount);
- //thisJob.ready(); // notify waiters
+ thisJob.ready(); // notify waiters
//runningJobs.remove(thisJob.getNode().getNumber());
}
logger.info("3: returning resultCount: " + resultCount);
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
2009-11-23 10:41:32 UTC (rev 39842)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Processor.java
2009-11-23 12:57:47 UTC (rev 39843)
@@ -237,7 +237,7 @@
initWatcher();
}
- private static final Map<Integer, Job> runningJobs =
Collections.synchronizedMap(new LinkedHashMap<Integer, Job>());
+ protected static final Map<Integer, Job> runningJobs =
Collections.synchronizedMap(new LinkedHashMap<Integer, Job>());
/* Jobs user with this context has started */
public static Set<Job> myJobs(UserContext u) {
@@ -258,6 +258,17 @@
public static Job getJob(Node node) {
return runningJobs.get(node.getNumber());
}
+
+ protected static boolean removeJob(Node node) {
+ Job job = runningJobs.get(node.getNumber());
+ boolean done = job.future.isDone();
+ if (done) {
+ runningJobs.remove(node.getNumber());
+ } else {
+ job.logger.info("This job has not completed yet.");
+ }
+ return done;
+ }
public static String cancelJob(Node node) {
if (node.getCloud().may(ActionRepository.getInstance().get("streams",
"cancel_jobs"), null)) {
@@ -266,12 +277,18 @@
return "No job for node #" + node.getNumber();
} else {
job.interrupt();
+ String msg = "";
+ if (job.future.isDone()) {
+ msg = "This job has already completed. ";
+ job.logger.info(msg);
+ }
if (job.future.cancel(true)) {
- String message = "Canceled job for node #" +
node.getNumber() + " (" + job.future + ")";
+ String message = msg + "Canceled job for node #" +
node.getNumber() + " (" + job.future + ")";
job.logger.info(message);
+ removeJob(node);
return message;
} else {
- return "Could not cancel " + job;
+ return msg + "Could not cancel " + job;
}
}
} else {
Modified:
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
===================================================================
---
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
2009-11-23 10:41:32 UTC (rev 39842)
+++
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/createcaches/Result.java
2009-11-23 12:57:47 UTC (rev 39843)
@@ -60,7 +60,6 @@
*/
public void ready() {
ready = true;
-
}
public boolean isReady() {
return ready;
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs