Author: cziegeler
Date: Wed Feb 10 15:38:14 2010
New Revision: 908544
URL: http://svn.apache.org/viewvc?rev=908544&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
First implementation for job queues.
Modified:
sling/trunk/bundles/extensions/event/NOTICE
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE
Modified: sling/trunk/bundles/extensions/event/NOTICE
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/NOTICE?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/NOTICE (original)
+++ sling/trunk/bundles/extensions/event/NOTICE Wed Feb 10 15:38:14 2010
@@ -1,5 +1,5 @@
Apache Sling Event
-Copyright 2008-2009 The Apache Software Foundation
+Copyright 2008-2010 The Apache Software Foundation
Apache Sling is based on source code originally developed
by Day Software (http://www.day.com/).
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
Wed Feb 10 15:38:14 2010
@@ -72,8 +72,12 @@
public static final String PROPERTY_JOB_ID = "event.job.id";
/** The property to set if a job can be run parallel to any other job.
- * For now the property should only contain the values <code>true</code>
- * or <code>false</code> as a string value.
+ * The following values are supported:
+ * - boolean value <code>true</code> and <code>false</code>
+ * - string value <code>true</code> and <code>false</code>
+ * - integer value higher than 1 - if this is specified jobs are run in
+ * parallel but never more than the specified number.
+ *
* We might want to use different values in the future for enhanced
* parallel job handling. */
public static final String PROPERTY_JOB_PARALLEL = "event.job.parallel";
@@ -93,13 +97,18 @@
/** The property to set to put the jobs into a separate job queue. This
property
* specifies the name of the job queue. If the job queue does not exists
yet
* a new queue is created.
- * If a job queue is used, the jobs are never executed in parallel from
this queue!
+ * If a ordered job queue is used, the jobs are never executed in parallel
+ * from this queue! For non ordered queues the {...@link
#PROPERTY_JOB_PARALLEL}
+ * with an integer value higher than 1 can be used to specify the maximum
number
+ * of parallel jobs for this queue.
*/
public static final String PROPERTY_JOB_QUEUE_NAME = "event.job.queuename";
/** If this property is set with any value, the queue processes the jobs
in the same
* order as they have arrived.
- * This property has only an effect if {...@link #PROPERTY_JOB_QUEUE_NAME}
is specified.
+ * This property has only an effect if {...@link #PROPERTY_JOB_QUEUE_NAME}
is specified
+ * and starting with version 2.2 this value is only checked in the first
job for this
+ * queue.
*/
public static final String PROPERTY_JOB_QUEUE_ORDERED =
"event.job.queueordered";
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Wed Feb 10 15:38:14 2010
@@ -599,7 +599,7 @@
*/
public void run() {
while ( running && !jq.isFinished() ) {
- logger.info("Starting job queue {}",
queueName);
+ logger.info("Starting {}job queue {}",
(orderedQueue ? "ordered " : ""), queueName);
try {
runJobQueue(queueName, jq);
} catch (Throwable t) {
@@ -650,18 +650,36 @@
}
if ( info != null && this.running && !jobQueue.isFinished() ) {
- synchronized ( jobQueue.getLock()) {
- final EventInfo processInfo = info;
- info = null;
- final Status status = this.executeJob(processInfo,
jobQueue);
- if ( status == Status.SUCCESS ) {
+ final EventInfo processInfo = info;
+ info = null;
+ if ( jobQueue.isOrdered() ) {
+ // if we are ordered we simply wait for the finish
+ synchronized ( jobQueue.getLock()) {
+ final Status status = this.executeJob(processInfo,
jobQueue);
+ if ( status == Status.SUCCESS ) {
+ try {
+ info = jobQueue.waitForFinish();
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ } else if ( status == Status.RESCHEDULE ) {
+ info = jobQueue.reschedule(processInfo,
this.scheduler);
+ }
+ }
+ } else {
+ final int maxJobs =
ParallelInfo.getMaxNumberOfParallelJobs(processInfo.event);
+ synchronized ( jobQueue.getLock() ) {
try {
- info = jobQueue.waitForFinish();
+ jobQueue.acquireSlot(maxJobs);
} catch (InterruptedException e) {
this.ignoreException(e);
}
- } else if ( status == Status.RESCHEDULE ) {
- info = jobQueue.reschedule(processInfo,
this.scheduler);
+ }
+ if ( this.running && !jobQueue.isFinished() ) {
+ final Status status = this.executeJob(processInfo,
jobQueue);
+ if ( status == Status.RESCHEDULE ) {
+ jobQueue.reschedule(processInfo, this.scheduler);
+ }
}
}
}
@@ -1432,7 +1450,11 @@
if ( info != null ) {
reprocessInfo = jobQueue.reschedule(info, this.scheduler);
}
- jobQueue.notifyFinish(reprocessInfo);
+ if ( jobQueue.isOrdered() ) {
+ jobQueue.notifyFinish(reprocessInfo);
+ } else {
+ jobQueue.freeSlot();
+ }
}
}
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
Wed Feb 10 15:38:14 2010
@@ -57,6 +57,8 @@
/** Ordered Queue? */
private final boolean orderedQueue;
+ private volatile int jobCount;
+
public JobBlockingQueue(final String name,
final boolean orderedQueue,
final Logger logger) {
@@ -65,6 +67,10 @@
this.logger = logger;
}
+ /**
+ * Wait for the job to be finished.
+ * This is called if the queue is ordered.
+ */
public EventInfo waitForFinish() throws InterruptedException {
this.isWaiting = true;
this.markForCleanUp = false;
@@ -75,21 +81,56 @@
return object;
}
+ /**
+ * Mark this queue for cleanup.
+ */
public void markForCleanUp() {
if ( !this.isWaiting ) {
this.markForCleanUp = true;
}
}
+ /**
+ * Acquire a processing slot.
+ * This method is called if the queue is not ordered.
+ * @param maxJobs
+ */
+ public void acquireSlot(final int maxJobs) throws InterruptedException {
+ if ( jobCount >= maxJobs ) {
+ this.isWaiting = true;
+ this.markForCleanUp = false;
+ this.lock.wait();
+ this.isWaiting = false;
+ }
+ }
+
+ /**
+ * Free a slot when a job processing is finished.
+ */
+ public void freeSlot() {
+ if ( this.isWaiting ) {
+ this.lock.notify();
+ }
+ }
+
+ /**
+ * Check if this queue is marked for cleanup
+ */
public boolean isMarkedForCleanUp() {
return !this.isWaiting && this.markForCleanUp;
}
+ /**
+ * Notify a finished job - for ordered queues
+ */
public void notifyFinish(EventInfo i) {
this.eventInfo = i;
this.lock.notify();
}
+ /**
+ * Return the lock for this queue.
+ */
public Object getLock() {
return lock;
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/ParallelInfo.java
Wed Feb 10 15:38:14 2010
@@ -92,4 +92,27 @@
}
return ParallelInfo.SERIAL;
}
+
+ /**
+ * Return the maximum number of parallel jobs for named queues.
+ * @param job The job
+ */
+ public static int getMaxNumberOfParallelJobs(final Event job) {
+ Object value = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL);
+ if ( value instanceof String ) {
+ // try to parse the value
+ try {
+ value = Integer.valueOf((String)value);
+ } catch (NumberFormatException ne) {
+ // we ignore this
+ }
+ }
+ if ( value instanceof Number ) {
+ final int result = ((Number)value).intValue();
+ if ( result > 1 ) {
+ return result;
+ }
+ }
+ return 1;
+ }
}
\ No newline at end of file
Modified:
sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE?rev=908544&r1=908543&r2=908544&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE
(original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/META-INF/NOTICE Wed
Feb 10 15:38:14 2010
@@ -1,5 +1,5 @@
Apache Sling Event
-Copyright 2008-2009 The Apache Software Foundation
+Copyright 2008-2010 The Apache Software Foundation
Apache Sling is based on source code originally developed
by Day Software (http://www.day.com/).