Update of /var/cvs/applications/crontab/src/org/mmbase/applications/crontab
In directory
james.mmbase.org:/tmp/cvs-serv15135/src/org/mmbase/applications/crontab
Modified Files:
CronDaemon.java CronEntry.java
Added Files:
ProposedJobs.java
Log Message:
first simple version of fix for MMB-1687
See also:
http://cvs.mmbase.org/viewcvs/applications/crontab/src/org/mmbase/applications/crontab
See also: http://www.mmbase.org/jira/browse/MMB-1687
ProposedJobs.java is new
Index: CronDaemon.java
===================================================================
RCS file:
/var/cvs/applications/crontab/src/org/mmbase/applications/crontab/CronDaemon.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -b -r1.15 -r1.16
--- CronDaemon.java 14 Jul 2008 13:42:36 -0000 1.15
+++ CronDaemon.java 29 Jul 2008 13:36:34 -0000 1.16
@@ -9,7 +9,9 @@
import java.util.*;
import org.mmbase.util.DynamicDate;
+import org.mmbase.core.event.EventManager;
import org.mmbase.util.logging.*;
+import java.util.concurrent.DelayQueue;
/**
* CronDaemon is a "crontab" clone written in java.
@@ -18,9 +20,9 @@
*
* @author Kees Jongenburger
* @author Michiel Meeuwissen
- * @version $Id: CronDaemon.java,v 1.15 2008/07/14 13:42:36 michiel Exp $
+ * @version $Id: CronDaemon.java,v 1.16 2008/07/29 13:36:34 michiel Exp $
*/
-public class CronDaemon {
+public class CronDaemon implements ProposedJobs.Listener {
private static final Logger log =
Logging.getLoggerInstance(CronDaemon.class);
@@ -30,6 +32,8 @@
private Set<CronEntry> removedCronEntries;
private Set<CronEntry> addedCronEntries;
+ private DelayQueue<ProposedJobs.Event> proposedJobs = null;
+
/**
* CronDaemon is a Singleton. This makes the one instance and starts the
Thread.
*/
@@ -37,9 +41,47 @@
cronEntries = Collections.synchronizedSet(new
LinkedHashSet<CronEntry>()); // predictable order
removedCronEntries = Collections.synchronizedSet(new
HashSet<CronEntry>());
addedCronEntries = Collections.synchronizedSet(new
LinkedHashSet<CronEntry>()); // predictable order
+ EventManager.getInstance().addEventListener(this);
start();
}
+
+ public void notify(ProposedJobs.Event event) {
+ synchronized(proposedJobs) {
+ Iterator<ProposedJobs.Event> i = proposedJobs.iterator();
+ while (i.hasNext()) {
+ ProposedJobs.Event proposed = i.next();
+ if (event.equals(proposed)) {
+ if (proposed.getMachine().compareTo(event.getMachine()) >
0) {
+ event = proposed;
+ }
+ // remove any way, to readd later after the loop.
+ i.remove();
+ }
+ }
+ proposedJobs.add(event);
+ }
+ }
+
+ protected void consumeJobs() {
+ synchronized(proposedJobs) {
+ ProposedJobs.Event event = proposedJobs.poll();
+ while (event != null) {
+ if (event.isLocal()) {
+ CronEntry proposed = event.getCronEntry();
+ CronEntry local = getById(cronEntries,
event.getCronEntry().getId());
+ if (local.equals(proposed)) {
+ //local.setLastRun(event.getCronStart());
+
org.mmbase.util.ThreadPools.jobsExecutor.execute(local.getExecutable());
+ }
+ } else {
+ /// event will be execute somewhere else
+ /// could administrate this, and perhaps watch if it
sucessfully succeeds
+ }
+ }
+ }
+ }
+
/**
* Finds in given set the CronEntry with the given id.
* @return a CronEntry if found, <code>null</code> otherwise.
@@ -71,6 +113,10 @@
} else {
addEntry(entry);
}
+ if (entry.getType() == CronEntry.Type.BALANCE && proposedJobs == null)
{
+ proposedJobs = new DelayQueue<ProposedJobs.Event>();
+ cronTimer.scheduleAtFixedRate(new TimerTask() { public void run()
{CronDaemon.this.consumeJobs();} }, getFirst(), 60 * 1000);
+ }
}
@@ -108,13 +154,7 @@
log.service("Removed entry " + entry);
}
- /**
- * Starts the daemon, which you might want to do if you have stopped if
for some reason. The
- * daemon is already started on default.
- */
- public void start() {
- log.info("Starting CronDaemon");
- cronTimer = new Timer(true);
+ protected Date getFirst() {
Date first;
try {
first = DynamicDate.eval(DynamicDate.getInstance("tominute next
minute"));
@@ -122,6 +162,17 @@
log.fatal(parseException); // could not happen
first = new Date();
}
+ return first;
+ }
+
+ /**
+ * Starts the daemon, which you might want to do if you have stopped if
for some reason. The
+ * daemon is already started on default.
+ */
+ public void start() {
+ log.info("Starting CronDaemon");
+ cronTimer = new Timer(true);
+ Date first = getFirst();
log.debug("First run at " + first);
cronTimer.scheduleAtFixedRate(new TimerTask() { public void run()
{CronDaemon.this.run();} }, first, 60 * 1000);
}
Index: CronEntry.java
===================================================================
RCS file:
/var/cvs/applications/crontab/src/org/mmbase/applications/crontab/CronEntry.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -b -r1.12 -r1.13
--- CronEntry.java 29 Jul 2008 10:01:21 -0000 1.12
+++ CronEntry.java 29 Jul 2008 13:36:34 -0000 1.13
@@ -10,6 +10,7 @@
import java.util.*;
import java.util.regex.*;
import org.mmbase.module.core.MMBase;
+import org.mmbase.util.HashCodeUtil;
import org.mmbase.util.logging.*;
@@ -18,10 +19,10 @@
*
* @author Kees Jongenburger
* @author Michiel Meeuwissen
- * @version $Id: CronEntry.java,v 1.12 2008/07/29 10:01:21 michiel Exp $
+ * @version $Id: CronEntry.java,v 1.13 2008/07/29 13:36:34 michiel Exp $
*/
-public class CronEntry {
+public class CronEntry implements java.io.Serializable {
private static final Logger log =
Logging.getLoggerInstance(CronEntry.class);
@@ -49,7 +50,22 @@
* The 'can be more' type job is like a 'must be one' job, but the
run() method of such jobs is even
* called (when scheduled) if it itself is still running.
*/
- CANBEMORE;
+ CANBEMORE,
+
+ /**
+ * A job of this type runs exactly once in the load balanced mmbase
cluster. Before the job
+ * is started, communication between mmbase's in the server will be
done, to negotiate who
+ * is going to do it.
+ */
+ BALANCE,
+
+
+ /**
+ * NOT IMPLEMENTED. As BALANCED, but no job is started as the previous
was not yet finished.
+ */
+ BALANCE_MUSTBEONE;
+
+
public static Type DEFAULT = MUSTBEONE;
public static Type valueOf(int i) {
@@ -62,9 +78,9 @@
}
- private CronJob cronJob;
+ private transient CronJob cronJob;
- private List<Interruptable> threads = Collections.synchronizedList(new
ArrayList<Interruptable>());
+ private transient List<Interruptable> threads =
Collections.synchronizedList(new ArrayList<Interruptable>());
private final String id;
private final String name;
@@ -167,7 +183,7 @@
return isAlive(0);
}
- public boolean kick() {
+ Interruptable getExecutable() {
final Date start = new Date();
Runnable ready = new Runnable() {
public void run() {
@@ -175,18 +191,25 @@
CronEntry.this.setLastCost((int) (new Date().getTime() -
start.getTime()));
}
};
+ setLastRun(start);
+ Interruptable thread = new Interruptable(cronJob, threads, ready);
+ return thread;
+ }
+
+ public boolean kick() {
switch (type) {
case SHORT:
{
try {
- setLastRun(new Date());
- Interruptable thread = new Interruptable(cronJob, threads,
ready);
- thread.run();
+ getExecutable().run();
} catch (Throwable t) {
log.error("Error during cron-job " + this +" : " +
t.getClass().getName() + " " + t.getMessage() + "\n" + Logging.stackTrace(t));
}
return true;
}
+ case BALANCE_MUSTBEONE: {
+ return true;
+ }
case MUSTBEONE:
if (isAlive()) {
return false;
@@ -194,9 +217,7 @@
// fall through
case CANBEMORE:
default :
- setLastRun(start);
- Interruptable thread = new Interruptable(cronJob, threads, ready);
- org.mmbase.util.ThreadPools.jobsExecutor.execute(thread);
+ org.mmbase.util.ThreadPools.jobsExecutor.execute(getExecutable());
return true;
}
@@ -235,8 +256,8 @@
public String getConfiguration() {
return configuration;
}
- public String getType() {
- return type.toString().toLowerCase();
+ public Type getType() {
+ return type;
}
public String getClassName() {
return className;
@@ -308,7 +329,12 @@
}
public int hashCode() {
- return id.hashCode() + name.hashCode() + className.hashCode() +
cronTime.hashCode();
+ int result = 0;
+ result = HashCodeUtil.hashCode(result, id);
+ result = HashCodeUtil.hashCode(result, name);
+ result = HashCodeUtil.hashCode(result, className);
+ result = HashCodeUtil.hashCode(result, cronTime);
+ return result;
}
public boolean equals(Object o) {
@@ -316,7 +342,9 @@
return false;
}
CronEntry other = (CronEntry)o;
- return id.equals(other.id) && name.equals(other.name) &&
className.equals(other.className) && cronTime.equals(other.cronTime) &&
servers.equals(other.servers) && (configuration == null ? other.configuration
== null : configuration.equals(other.configuration));
+ return id.equals(other.id) && name.equals(other.name) &&
+ className.equals(other.className) &&
cronTime.equals(other.cronTime) && servers.equals(other.servers)
+ && (configuration == null ? other.configuration == null :
configuration.equals(other.configuration));
}
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs