Author: michiel
Date: 2010-05-03 14:22:51 +0200 (Mon, 03 May 2010)
New Revision: 42083
Modified:
mmbase/trunk/applications/crontab/src/main/java/org/mmbase/applications/crontab/CronDaemon.java
Log:
ported from 1.9
Modified:
mmbase/trunk/applications/crontab/src/main/java/org/mmbase/applications/crontab/CronDaemon.java
===================================================================
---
mmbase/trunk/applications/crontab/src/main/java/org/mmbase/applications/crontab/CronDaemon.java
2010-05-03 11:47:03 UTC (rev 42082)
+++
mmbase/trunk/applications/crontab/src/main/java/org/mmbase/applications/crontab/CronDaemon.java
2010-05-03 12:22:51 UTC (rev 42083)
@@ -35,12 +35,13 @@
private final Set<CronEntry> addedCronEntries =
Collections.synchronizedSet(new LinkedHashSet<CronEntry>()); // predictable
order
private final DelayQueue<ProposedJobs.Event> proposedJobs = new
DelayQueue<ProposedJobs.Event>();
+ private final DelayQueue<RunningCronEntry> runningJobs = new
DelayQueue<RunningCronEntry>();
+
private ScheduledFuture proposedFuture;
private ScheduledFuture failedFuture;
private ScheduledFuture future;
- private final DelayQueue<RunningCronEntry> runningJobs = new
DelayQueue<RunningCronEntry>();
/**
* CronDaemon is a Singleton. This makes the one instance and starts the
Thread.
@@ -54,33 +55,29 @@
@Override
public void notify(ProposedJobs.Event event) {
log.debug("Received " + event);
- if (proposedJobs != null) {
- synchronized(proposedJobs) {
- log.debug("" + proposedJobs.size());
- Iterator<ProposedJobs.Event> i = proposedJobs.iterator();
- while (i.hasNext()) {
- ProposedJobs.Event proposed = i.next();
- if (event.equals(proposed)) {
- log.debug("Found job " + event + " already ");
- if
(proposed.getMachine().compareTo(event.getMachine()) < 0) {
- log.debug("Will prefer " + proposed.getMachine());
- event = proposed;
- } else {
- log.debug("Will prefer " + event.getMachine());
- }
- // remove any way, to readd later after the loop.
- i.remove();
- break; //can be only one
+ synchronized(proposedJobs) {
+ log.debug("" + proposedJobs.size());
+ Iterator<ProposedJobs.Event> i = proposedJobs.iterator();
+ while (i.hasNext()) {
+ ProposedJobs.Event proposed = i.next();
+ if (event.equals(proposed)) {
+ log.debug("Found job " + event + " already ");
+ if (proposed.getMachine().compareTo(event.getMachine()) <
0) {
+ log.debug("Will prefer " + proposed.getMachine());
+ event = proposed;
} else {
- log.debug("" + event + " != " + proposed);
+ log.debug("Will prefer " + event.getMachine());
}
+ // remove any way, to readd later after the loop.
+ i.remove();
+ break; //can be only one
+ } else {
+ log.debug("" + event + " != " + proposed);
}
- log.debug("Scheduling " + event);
- proposedJobs.add(event);
- log.debug("" + proposedJobs.size());
}
- } else {
- log.service("Ignored " + event + " because we don't have jobs of
type " + CronEntry.Type.BALANCE);
+ log.debug("Scheduling " + event);
+ proposedJobs.add(event);
+ log.debug("" + proposedJobs.size());
}
}
@Override
@@ -122,6 +119,9 @@
}
+ /**
+ * Consumes received job proposals
+ */
protected void consumeJobs() {
synchronized(proposedJobs) {
for (ProposedJobs.Event event = proposedJobs.poll(); event !=
null; event = proposedJobs.poll()) {
@@ -150,12 +150,8 @@
}
public List<ProposedJobs.Event> getQueue() {
- if (proposedJobs != null) {
- synchronized(proposedJobs) {
- return new ArrayList<ProposedJobs.Event>(proposedJobs);
- }
- } else {
- return Collections.emptyList();
+ synchronized(proposedJobs) {
+ return new ArrayList<ProposedJobs.Event>(proposedJobs);
}
}
public List<RunningCronEntry> getRunning() {
@@ -165,10 +161,12 @@
}
protected void detectFailedJobs() {
+ /*
synchronized(runningJobs) {
for (RunningCronEntry running = runningJobs.poll(); running !=
null; running = runningJobs.poll()) {
}
}
+ */
}
@@ -212,25 +210,6 @@
protected void addEntry(CronEntry entry) {
entry.init();
synchronized(cronEntries) {
- if ((entry.getType() == CronEntry.Type.BALANCE || entry.getType()
== CronEntry.Type.BALANCE_MUSTBEONE)
- && proposedFuture == null) {
- proposedFuture = ThreadPools.scheduler.scheduleAtFixedRate(new
Runnable() {
- @Override
- public void run() {
- CronDaemon.this.consumeJobs();
- }
- }, getFirst(), 60 * 1000, TimeUnit.MILLISECONDS);
- ThreadPools.identify(proposedFuture, "Crontab's poposed
balanced job consumer");
- }
- if (failedFuture == null) {
- failedFuture = ThreadPools.scheduler.scheduleAtFixedRate(new
Runnable() {
- @Override
- public void run() {
- CronDaemon.this.detectFailedJobs();
- }
- }, getFirst(), 60 * 1000, TimeUnit.MILLISECONDS);
- ThreadPools.identify(failedFuture, "Crontab's failed job
detector (unfinished)");
- }
cronEntries.add(entry);
}
log.service("Added entry " + entry);
@@ -281,12 +260,29 @@
long first = getFirst();
log.debug("First run at " + first);
future = ThreadPools.scheduler.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- CronDaemon.this.run();
- }
- }, first, 60 * 1000, TimeUnit.MILLISECONDS);
+ @Override
+ public void run() {
+ CronDaemon.this.run();
+ }
+ }, first, 60 * 1000, TimeUnit.MILLISECONDS);
ThreadPools.identify(future, "Crontab's job kicker");
+
+ proposedFuture = ThreadPools.scheduler.scheduleAtFixedRate(new
Runnable() {
+ @Override
+ public void run() {
+ CronDaemon.this.consumeJobs();
+ }
+ }, getFirst(), 60 * 1000, TimeUnit.MILLISECONDS);
+ ThreadPools.identify(proposedFuture, "Crontab's poposed balanced job
consumer");
+
+ failedFuture = ThreadPools.scheduler.scheduleAtFixedRate(new
Runnable() {
+ @Override
+ public void run() {
+ CronDaemon.this.detectFailedJobs();
+ }
+ }, getFirst(), 60 * 1000, TimeUnit.MILLISECONDS);
+ ThreadPools.identify(failedFuture, "Crontab's failed job detector
(unfinished)");
+
}
/**
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs