Author: cziegeler
Date: Thu Dec 31 11:19:09 2009
New Revision: 894821
URL: http://svn.apache.org/viewvc?rev=894821&view=rev
Log:
SLING-1002 : Reduce memory consumption and improve startup behaviour of the job
handler
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=894821&r1=894820&r2=894821&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Thu Dec 31 11:19:09 2009
@@ -160,6 +160,12 @@
/** Number of parallel jobs for the main queue. */
private long parallelJobCount;
+ /** Number of jobs to load from the repository on startup in one go. */
+ private long maxLoadJobs;
+
+ /** Threshold - if the queue is lower than this threshold the repository
is checked for events. */
+ private long loadThreshold;
+
/**
* Activate this component.
* @param context
@@ -177,6 +183,12 @@
this.componentContext = context;
super.activate(context);
JOB_THREAD_POOL = this.threadPool;
+ // start background thread which loads jobs from the repository
+ this.threadPool.execute(new Runnable() {
+ public void run() {
+ loadJobsInTheBackground();
+ }
+ });
}
/**
@@ -241,6 +253,31 @@
return buffer.toString();
}
+ private void loadJobsInTheBackground() {
+ // give the system some time to start
+ try {
+ Thread.sleep(1000 * 30); // 30 secs
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // are we still running?
+ if ( this.running ) {
+ long loadSince = -1;
+ do {
+ loadSince = this.loadJobs(loadSince);
+ if ( this.running && loadSince > -1 ) {
+ do {
+ try {
+ Thread.sleep(1000 * 240);
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ } while ( this.running && this.queue.size() >
this.loadThreshold );
+ }
+ } while (this.running && loadSince > -1);
+ }
+ }
+
/**
* This method is invoked periodically.
* @see java.lang.Runnable#run()
@@ -450,15 +487,8 @@
null,
new String[] {this.getEventNodeType()},
true);
- // give the system some time to start
- try {
- Thread.sleep(1000 * 30); // 30 secs
- } catch (InterruptedException e) {
- this.ignoreException(e);
- }
// load unprocessed jobs from repository
if ( this.running ) {
- this.loadJobs();
logger.info("Apache Sling Job Event Handler started.");
logger.debug("Job Handler Configuration:
(sleepTime={}secs,maxJobRetries={},waitForAck={}ms,maximumParallelJobs={},cleanupPeriod={}min)",
new Object[] {sleepTime,
maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod});
@@ -1083,47 +1113,71 @@
* Load all active jobs from the repository.
* @throws RepositoryException
*/
- private void loadJobs() {
- try {
- final QueryManager qManager =
this.backgroundSession.getWorkspace().getQueryManager();
- final StringBuilder buffer = new StringBuilder("/jcr:root");
- buffer.append(this.repositoryPath);
- buffer.append("//element(*, ");
- buffer.append(this.getEventNodeType());
- buffer.append(") order by @");
- buffer.append(EventHelper.NODE_PROPERTY_CREATED);
- buffer.append(" ascending");
- final Query q = qManager.createQuery(buffer.toString(),
Query.XPATH);
- final NodeIterator result = q.execute().getNodes();
- while ( result.hasNext() ) {
- final Node eventNode = result.nextNode();
- if ( !eventNode.isLocked() &&
!eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
- final String nodePath = eventNode.getPath();
- try {
- final Event event = this.readEvent(eventNode);
- final EventInfo info = new EventInfo();
- info.event = event;
- info.nodePath = nodePath;
+ private long loadJobs(final long since) {
+ long eventCreated = since;
+ final long maxLoad = (since == -1 ? this.maxLoadJobs :
this.maxLoadJobs - this.queue.size());
+ // sanity check
+ if ( maxLoad > 0 ) {
+ try {
+ final QueryManager qManager =
this.backgroundSession.getWorkspace().getQueryManager();
+ final StringBuilder buffer = new StringBuilder("/jcr:root");
+ buffer.append(this.repositoryPath);
+ buffer.append("//element(*, ");
+ buffer.append(this.getEventNodeType());
+ buffer.append(")");
+ if ( since != -1 ) {
+ final Calendar beforeDate = Calendar.getInstance();
+ beforeDate.setTimeInMillis(since);
+ final String dateString = ISO8601.format(beforeDate);
+ buffer.append("[@");
+ buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+ buffer.append(" >= xs:dateTime('");
+ buffer.append(dateString);
+ buffer.append("')]");
+ }
+ buffer.append(" order by @");
+ buffer.append(EventHelper.NODE_PROPERTY_CREATED);
+ buffer.append(" ascending");
+ final Query q = qManager.createQuery(buffer.toString(),
Query.XPATH);
+ final NodeIterator result = q.execute().getNodes();
+ long count = 0;
+ while ( result.hasNext() && count < maxLoad ) {
+ final Node eventNode = result.nextNode();
+ if ( !eventNode.isLocked() &&
!eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
+ count++;
+ eventCreated =
eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+ final String nodePath = eventNode.getPath();
try {
- this.queue.put(info);
- } catch (InterruptedException e) {
- // we ignore this exception as this should never
occur
- this.ignoreException(e);
- }
- } catch (ClassNotFoundException cnfe) {
- // store path for lazy loading
- synchronized ( this.unloadedJobs ) {
- this.unloadedJobs.add(nodePath);
+ final Event event = this.readEvent(eventNode);
+ final EventInfo info = new EventInfo();
+ info.event = event;
+ info.nodePath = nodePath;
+ try {
+ this.queue.put(info);
+ } catch (InterruptedException e) {
+ // we ignore this exception as this should
never occur
+ this.ignoreException(e);
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // store path for lazy loading
+ synchronized ( this.unloadedJobs ) {
+ this.unloadedJobs.add(nodePath);
+ }
+ this.ignoreException(cnfe);
+ } catch (RepositoryException re) {
+ this.logger.error("Unable to load stored job from
" + nodePath, re);
}
- this.ignoreException(cnfe);
- } catch (RepositoryException re) {
- this.logger.error("Unable to load stored job from " +
nodePath, re);
}
}
+ // have we processed all jobs?
+ if ( !result.hasNext() ) {
+ eventCreated = -1;
+ }
+ } catch (RepositoryException re) {
+ this.logger.error("Exception during initial loading of stored
jobs.", re);
}
- } catch (RepositoryException re) {
- this.logger.error("Exception during initial loading of stored
jobs.", re);
}
+ return eventCreated;
}
/**