Update of /var/cvs/applications/crontab/src/org/mmbase/applications/crontab
In directory 
james.mmbase.org:/tmp/cvs-serv15135/src/org/mmbase/applications/crontab

Modified Files:
        CronDaemon.java CronEntry.java 
Added Files:
        ProposedJobs.java 
Log Message:
first simple version of fix for MMB-1687


See also: 
http://cvs.mmbase.org/viewcvs/applications/crontab/src/org/mmbase/applications/crontab
See also: http://www.mmbase.org/jira/browse/MMB-1687


ProposedJobs.java is new



Index: CronDaemon.java
===================================================================
RCS file: 
/var/cvs/applications/crontab/src/org/mmbase/applications/crontab/CronDaemon.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -b -r1.15 -r1.16
--- CronDaemon.java     14 Jul 2008 13:42:36 -0000      1.15
+++ CronDaemon.java     29 Jul 2008 13:36:34 -0000      1.16
@@ -9,7 +9,9 @@
 
 import java.util.*;
 import org.mmbase.util.DynamicDate;
+import org.mmbase.core.event.EventManager;
 import org.mmbase.util.logging.*;
+import java.util.concurrent.DelayQueue;
 
 /**
  * CronDaemon is a "crontab" clone written in java.
@@ -18,9 +20,9 @@
  *
  * @author Kees Jongenburger
  * @author Michiel Meeuwissen
- * @version $Id: CronDaemon.java,v 1.15 2008/07/14 13:42:36 michiel Exp $
+ * @version $Id: CronDaemon.java,v 1.16 2008/07/29 13:36:34 michiel Exp $
  */
-public class CronDaemon  {
+public class CronDaemon  implements ProposedJobs.Listener {
 
     private static final Logger log = 
Logging.getLoggerInstance(CronDaemon.class);
 
@@ -30,6 +32,8 @@
     private Set<CronEntry> removedCronEntries;
     private Set<CronEntry> addedCronEntries;
 
+    private DelayQueue<ProposedJobs.Event> proposedJobs = null;
+
     /**
      * CronDaemon is a Singleton. This makes the one instance and starts the 
Thread.
      */
@@ -37,9 +41,47 @@
         cronEntries = Collections.synchronizedSet(new 
LinkedHashSet<CronEntry>()); // predictable order
         removedCronEntries = Collections.synchronizedSet(new 
HashSet<CronEntry>());
         addedCronEntries = Collections.synchronizedSet(new 
LinkedHashSet<CronEntry>()); // predictable order
+        EventManager.getInstance().addEventListener(this);
         start();
     }
 
+
+    public void notify(ProposedJobs.Event event) {
+        synchronized(proposedJobs) {
+            Iterator<ProposedJobs.Event> i = proposedJobs.iterator();
+            while (i.hasNext()) {
+                ProposedJobs.Event proposed = i.next();
+                if (event.equals(proposed)) {
+                    if (proposed.getMachine().compareTo(event.getMachine()) > 
0) {
+                        event = proposed;
+                    }
+                    // remove any way, to readd later after the loop.
+                    i.remove();
+                }
+            }
+            proposedJobs.add(event);
+        }
+    }
+
+    protected void consumeJobs() {
+        synchronized(proposedJobs) {
+            ProposedJobs.Event event = proposedJobs.poll();
+            while (event != null) {
+                if (event.isLocal()) {
+                    CronEntry proposed = event.getCronEntry();
+                    CronEntry local = getById(cronEntries, 
event.getCronEntry().getId());
+                    if (local.equals(proposed)) {
+                        //local.setLastRun(event.getCronStart());
+                        
org.mmbase.util.ThreadPools.jobsExecutor.execute(local.getExecutable());
+                    }
+                } else {
+                    /// event will be execute somewhere else
+                    /// could administrate this, and perhaps watch if it 
sucessfully succeeds
+                }
+            }
+        }
+    }
+
     /**
      * Finds in given set the CronEntry with the given id.
      * @return a CronEntry if found, <code>null</code> otherwise.
@@ -71,6 +113,10 @@
         } else {
             addEntry(entry);
         }
+        if (entry.getType() == CronEntry.Type.BALANCE && proposedJobs == null) 
{
+            proposedJobs = new DelayQueue<ProposedJobs.Event>();
+            cronTimer.scheduleAtFixedRate(new TimerTask() { public void run() 
{CronDaemon.this.consumeJobs();} }, getFirst(), 60 * 1000);
+        }
 
     }
 
@@ -108,13 +154,7 @@
         log.service("Removed entry " + entry);
     }
 
-    /**
-     * Starts the daemon, which you might want to do if you have stopped if 
for some reason. The
-     * daemon is already started on default.
-     */
-    public void start() {
-        log.info("Starting CronDaemon");
-        cronTimer = new Timer(true);
+    protected Date getFirst() {
         Date first;
         try {
             first = DynamicDate.eval(DynamicDate.getInstance("tominute next 
minute"));
@@ -122,6 +162,17 @@
             log.fatal(parseException); // could not happen
             first = new Date();
         }
+        return first;
+    }
+
+    /**
+     * Starts the daemon, which you might want to do if you have stopped if 
for some reason. The
+     * daemon is already started on default.
+     */
+    public void start() {
+        log.info("Starting CronDaemon");
+        cronTimer = new Timer(true);
+        Date first = getFirst();
         log.debug("First run at " + first);
         cronTimer.scheduleAtFixedRate(new TimerTask() { public void run() 
{CronDaemon.this.run();} }, first, 60 * 1000);
     }


Index: CronEntry.java
===================================================================
RCS file: 
/var/cvs/applications/crontab/src/org/mmbase/applications/crontab/CronEntry.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -b -r1.12 -r1.13
--- CronEntry.java      29 Jul 2008 10:01:21 -0000      1.12
+++ CronEntry.java      29 Jul 2008 13:36:34 -0000      1.13
@@ -10,6 +10,7 @@
 import java.util.*;
 import java.util.regex.*;
 import org.mmbase.module.core.MMBase;
+import org.mmbase.util.HashCodeUtil;
 
 import org.mmbase.util.logging.*;
 
@@ -18,10 +19,10 @@
  *
  * @author Kees Jongenburger
  * @author Michiel Meeuwissen
- * @version $Id: CronEntry.java,v 1.12 2008/07/29 10:01:21 michiel Exp $
+ * @version $Id: CronEntry.java,v 1.13 2008/07/29 13:36:34 michiel Exp $
  */
 
-public class CronEntry {
+public class CronEntry implements java.io.Serializable {
 
     private static final Logger log = 
Logging.getLoggerInstance(CronEntry.class);
 
@@ -49,7 +50,22 @@
          * The 'can be more' type job is like a 'must be one' job, but the 
run() method of such jobs is even
          * called (when scheduled) if it itself is still running.
          */
-        CANBEMORE;
+        CANBEMORE,
+
+        /**
+         * A job of this type runs exactly once in the load balanced mmbase 
cluster. Before the job
+         * is started, communication between mmbase's in the server will be 
done, to negotiate who
+         * is going to do it.
+         */
+         BALANCE,
+
+
+         /**
+         * NOT IMPLEMENTED. As BALANCED, but no job is started as the previous 
was not yet finished.
+         */
+        BALANCE_MUSTBEONE;
+
+
 
         public static Type DEFAULT = MUSTBEONE;
         public static Type valueOf(int i) {
@@ -62,9 +78,9 @@
      }
 
 
-    private CronJob cronJob;
+    private transient CronJob cronJob;
 
-    private List<Interruptable> threads = Collections.synchronizedList(new 
ArrayList<Interruptable>());
+    private transient List<Interruptable> threads = 
Collections.synchronizedList(new ArrayList<Interruptable>());
 
     private final String id;
     private final String name;
@@ -167,7 +183,7 @@
         return isAlive(0);
     }
 
-    public boolean kick() {
+    Interruptable getExecutable() {
         final Date start = new Date();
         Runnable ready = new Runnable() {
                 public void run() {
@@ -175,18 +191,25 @@
                     CronEntry.this.setLastCost((int) (new Date().getTime() - 
start.getTime()));
                 }
             };
+        setLastRun(start);
+        Interruptable thread = new Interruptable(cronJob, threads, ready);
+        return thread;
+    }
+
+    public boolean kick() {
         switch (type) {
         case SHORT:
             {
                 try {
-                    setLastRun(new Date());
-                    Interruptable thread = new Interruptable(cronJob, threads, 
ready);
-                    thread.run();
+                    getExecutable().run();
                 } catch (Throwable t) {
                     log.error("Error during cron-job " + this +" : " + 
t.getClass().getName() + " " + t.getMessage() + "\n" + Logging.stackTrace(t));
                 }
                 return true;
             }
+        case BALANCE_MUSTBEONE: {
+            return true;
+        }
         case MUSTBEONE:
             if (isAlive()) {
                 return false;
@@ -194,9 +217,7 @@
             // fall through
         case CANBEMORE:
         default :
-            setLastRun(start);
-            Interruptable thread = new Interruptable(cronJob, threads, ready);
-            org.mmbase.util.ThreadPools.jobsExecutor.execute(thread);
+            org.mmbase.util.ThreadPools.jobsExecutor.execute(getExecutable());
             return true;
         }
 
@@ -235,8 +256,8 @@
     public String getConfiguration() {
         return configuration;
     }
-    public String getType() {
-        return type.toString().toLowerCase();
+    public Type getType() {
+        return type;
     }
     public String getClassName() {
         return className;
@@ -308,7 +329,12 @@
     }
 
     public int hashCode() {
-        return id.hashCode() + name.hashCode() + className.hashCode() + 
cronTime.hashCode();
+        int result = 0;
+        result = HashCodeUtil.hashCode(result, id);
+        result = HashCodeUtil.hashCode(result, name);
+        result = HashCodeUtil.hashCode(result, className);
+        result = HashCodeUtil.hashCode(result, cronTime);
+        return result;
     }
 
     public boolean equals(Object o) {
@@ -316,7 +342,9 @@
             return false;
         }
         CronEntry other = (CronEntry)o;
-        return id.equals(other.id) && name.equals(other.name) && 
className.equals(other.className) && cronTime.equals(other.cronTime) && 
servers.equals(other.servers)  && (configuration == null ? other.configuration 
== null : configuration.equals(other.configuration));
+        return id.equals(other.id) && name.equals(other.name) &&
+            className.equals(other.className) && 
cronTime.equals(other.cronTime) && servers.equals(other.servers)
+            && (configuration == null ? other.configuration == null : 
configuration.equals(other.configuration));
     }
 
 
_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs

Reply via email to