Author: cziegeler
Date: Thu Dec 31 11:19:09 2009
New Revision: 894821

URL: http://svn.apache.org/viewvc?rev=894821&view=rev
Log:
SLING-1002 : Reduce memory consumption and improve startup behaviour of the job 
handler

Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=894821&r1=894820&r2=894821&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 Thu Dec 31 11:19:09 2009
@@ -160,6 +160,12 @@
     /** Number of parallel jobs for the main queue. */
     private long parallelJobCount;
 
+    /** Number of jobs to load from the repository on startup in one go. */
+    private long maxLoadJobs;
+
+    /** Threshold - if the queue is lower than this threshold the repository 
is checked for events. */
+    private long loadThreshold;
+
     /**
      * Activate this component.
      * @param context
@@ -177,6 +183,12 @@
         this.componentContext = context;
         super.activate(context);
         JOB_THREAD_POOL = this.threadPool;
+        // start background thread which loads jobs from the repository
+        this.threadPool.execute(new Runnable() {
+            public void run() {
+                loadJobsInTheBackground();
+            }
+        });
     }
 
     /**
@@ -241,6 +253,31 @@
         return buffer.toString();
     }
 
+    private void loadJobsInTheBackground() {
+        // give the system some time to start
+        try {
+            Thread.sleep(1000 * 30); // 30 secs
+        } catch (InterruptedException e) {
+            this.ignoreException(e);
+        }
+        // are we still running?
+        if ( this.running ) {
+            long loadSince = -1;
+            do {
+                loadSince = this.loadJobs(loadSince);
+                if ( this.running && loadSince > -1 ) {
+                    do {
+                        try {
+                            Thread.sleep(1000 * 240);
+                        } catch (InterruptedException e) {
+                            this.ignoreException(e);
+                        }
+                    } while ( this.running && this.queue.size() > 
this.loadThreshold );
+                }
+            } while (this.running && loadSince > -1);
+        }
+    }
+
     /**
      * This method is invoked periodically.
      * @see java.lang.Runnable#run()
@@ -450,15 +487,8 @@
                                   null,
                                   new String[] {this.getEventNodeType()},
                                   true);
-        // give the system some time to start
-        try {
-            Thread.sleep(1000 * 30); // 30 secs
-        } catch (InterruptedException e) {
-            this.ignoreException(e);
-        }
         // load unprocessed jobs from repository
         if ( this.running ) {
-            this.loadJobs();
             logger.info("Apache Sling Job Event Handler started.");
             logger.debug("Job Handler Configuration: 
(sleepTime={}secs,maxJobRetries={},waitForAck={}ms,maximumParallelJobs={},cleanupPeriod={}min)",
                     new Object[] {sleepTime, 
maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod});
@@ -1083,47 +1113,71 @@
      * Load all active jobs from the repository.
      * @throws RepositoryException
      */
-    private void loadJobs() {
-        try {
-            final QueryManager qManager = 
this.backgroundSession.getWorkspace().getQueryManager();
-            final StringBuilder buffer = new StringBuilder("/jcr:root");
-            buffer.append(this.repositoryPath);
-            buffer.append("//element(*, ");
-            buffer.append(this.getEventNodeType());
-            buffer.append(") order by @");
-            buffer.append(EventHelper.NODE_PROPERTY_CREATED);
-            buffer.append(" ascending");
-            final Query q = qManager.createQuery(buffer.toString(), 
Query.XPATH);
-            final NodeIterator result = q.execute().getNodes();
-            while ( result.hasNext() ) {
-                final Node eventNode = result.nextNode();
-                if ( !eventNode.isLocked() && 
!eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
-                    final String nodePath = eventNode.getPath();
-                    try {
-                        final Event event = this.readEvent(eventNode);
-                        final EventInfo info = new EventInfo();
-                        info.event = event;
-                        info.nodePath = nodePath;
+    private long loadJobs(final long since) {
+        long eventCreated = since;
+        final long maxLoad = (since == -1 ? this.maxLoadJobs : 
this.maxLoadJobs - this.queue.size());
+        // sanity check
+        if ( maxLoad > 0 ) {
+            try {
+                final QueryManager qManager = 
this.backgroundSession.getWorkspace().getQueryManager();
+                final StringBuilder buffer = new StringBuilder("/jcr:root");
+                buffer.append(this.repositoryPath);
+                buffer.append("//element(*, ");
+                buffer.append(this.getEventNodeType());
+                buffer.append(")");
+                if ( since != -1 ) {
+                    final Calendar beforeDate = Calendar.getInstance();
+                    beforeDate.setTimeInMillis(since);
+                    final String dateString = ISO8601.format(beforeDate);
+                    buffer.append("[@");
+                    buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+                    buffer.append(" >= xs:dateTime('");
+                    buffer.append(dateString);
+                    buffer.append("')]");
+                }
+                buffer.append(" order by @");
+                buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+                buffer.append(" ascending");
+                final Query q = qManager.createQuery(buffer.toString(), 
Query.XPATH);
+                final NodeIterator result = q.execute().getNodes();
+                long count = 0;
+                while ( result.hasNext() && count < maxLoad ) {
+                    final Node eventNode = result.nextNode();
+                    if ( !eventNode.isLocked() && 
!eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+                        count++;
+                        eventCreated = 
eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+                        final String nodePath = eventNode.getPath();
                         try {
-                            this.queue.put(info);
-                        } catch (InterruptedException e) {
-                            // we ignore this exception as this should never 
occur
-                            this.ignoreException(e);
-                        }
-                    } catch (ClassNotFoundException cnfe) {
-                        // store path for lazy loading
-                        synchronized ( this.unloadedJobs ) {
-                            this.unloadedJobs.add(nodePath);
+                            final Event event = this.readEvent(eventNode);
+                            final EventInfo info = new EventInfo();
+                            info.event = event;
+                            info.nodePath = nodePath;
+                            try {
+                                this.queue.put(info);
+                            } catch (InterruptedException e) {
+                                // we ignore this exception as this should 
never occur
+                                this.ignoreException(e);
+                            }
+                        } catch (ClassNotFoundException cnfe) {
+                            // store path for lazy loading
+                            synchronized ( this.unloadedJobs ) {
+                                this.unloadedJobs.add(nodePath);
+                            }
+                            this.ignoreException(cnfe);
+                        } catch (RepositoryException re) {
+                            this.logger.error("Unable to load stored job from 
" + nodePath, re);
                         }
-                        this.ignoreException(cnfe);
-                    } catch (RepositoryException re) {
-                        this.logger.error("Unable to load stored job from " + 
nodePath, re);
                     }
                 }
+                // have we processed all jobs?
+                if ( !result.hasNext() ) {
+                    eventCreated = -1;
+                }
+            } catch (RepositoryException re) {
+                this.logger.error("Exception during initial loading of stored 
jobs.", re);
             }
-        } catch (RepositoryException re) {
-            this.logger.error("Exception during initial loading of stored 
jobs.", re);
         }
+        return eventCreated;
     }
 
     /**


Reply via email to