Author: cziegeler
Date: Thu Feb 11 10:30:14 2010
New Revision: 908911
URL: http://svn.apache.org/viewvc?rev=908911&view=rev
Log:
SLING-1369 : Make the maximum number of job queues configurable
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Thu Feb 11 10:30:14 2010
@@ -120,12 +120,18 @@
/** Default nubmer of parallel jobs. */
private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
+ /** Default nubmer of job queues. */
+ private static final int DEFAULT_MAXIMUM_JOB_QUEUES = 10;
+
@Property(longValue=DEFAULT_MAXIMUM_PARALLEL_JOBS)
private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS =
"max.parallel.jobs";
@Property(longValue=DEFAULT_WAIT_FOR_ACK)
private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
+ @Property(intValue=DEFAULT_MAXIMUM_JOB_QUEUES)
+ private static final String CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES =
"max.job.queues";
+
/** We check every 30 secs by default. */
private long sleepTime;
@@ -180,6 +186,9 @@
/** Number of jobs to load from the repository on startup in one go. */
private long maxLoadJobs;
+ /** Number of allowed job queues */
+ private int maxJobQueues;
+
/** Default maximum load jobs. */
private static final long DEFAULT_MAXIMUM_LOAD_JOBS = 1000;
@@ -234,6 +243,7 @@
this.loadThreshold =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_LOAD_THREASHOLD),
DEFAULT_LOAD_THRESHOLD);
this.backgroundLoadDelay =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY),
DEFAULT_BACKGROUND_LOAD_DELAY);
this.backgroundCheckDelay =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY),
DEFAULT_BACKGROUND_CHECK_DELAY);
+ this.maxJobQueues =
OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES),
DEFAULT_MAXIMUM_JOB_QUEUES);
this.componentContext = context;
super.activate(context);
JOB_THREAD_POOL = this.threadPool;
@@ -555,8 +565,9 @@
// load unprocessed jobs from repository
if ( this.running ) {
logger.info("Apache Sling Job Event Handler started.");
- logger.debug("Job Handler Configuration: (sleepTime={} secs,
maxJobRetries={}, waitForAck={} ms, maximumParallelJobs={}, cleanupPeriod={}
min)",
- new Object[] {sleepTime,
maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod});
+ logger.debug("Job Handler Configuration: (sleepTime={} secs,
maxJobRetries={}," +
+ " waitForAck={} ms, maximumParallelJobs={},
cleanupPeriod={} min, maxJobQueues={})",
+ new Object[] {sleepTime,
maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod,maxJobQueues});
} else {
final ComponentContext ctx = this.componentContext;
// deactivate
@@ -596,43 +607,51 @@
if ( info != null &&
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
final String queueName =
(String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
synchronized ( this.jobQueues ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Queuing job {} into queue {}.",
EventUtil.toString(info.event), queueName);
- }
BlockingQueue<EventInfo> jobQueue =
this.jobQueues.get(queueName);
if ( jobQueue == null ) {
- final boolean orderedQueue =
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
- final JobBlockingQueue jq = new
JobBlockingQueue(queueName, orderedQueue, this.logger);
- jobQueue = jq;
- this.jobQueues.put(queueName, jq);
- // Start background thread
- this.threadPool.execute(new Runnable() {
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run() {
- while ( running && !jq.isFinished() ) {
- logger.info("Starting {}job queue {}",
(orderedQueue ? "ordered " : ""), queueName);
- try {
- runJobQueue(queueName, jq);
- } catch (Throwable t) {
- logger.error("Job queue stopped
with exception: " + t.getMessage() + ". Restarting.", t);
+ // check if we have exceeded the maximum number of
job queues
+ if ( this.jobQueues.size() >= this.maxJobQueues ) {
+ this.logger.warn("Unable to create new job
queue named {} as there are already {} job queues." +
+ " Try to increase the maximum number
of job queues!", queueName, this.jobQueues.size());
+ } else {
+ final boolean orderedQueue =
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+ final JobBlockingQueue jq = new
JobBlockingQueue(queueName, orderedQueue, this.logger);
+ jobQueue = jq;
+ this.jobQueues.put(queueName, jq);
+ // Start background thread
+ this.threadPool.execute(new Runnable() {
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ while ( running && !jq.isFinished() ) {
+ logger.info("Starting {}job queue
{}", (orderedQueue ? "ordered " : ""), queueName);
+ try {
+ runJobQueue(queueName, jq);
+ } catch (Throwable t) {
+ logger.error("Job queue
stopped with exception: " + t.getMessage() + ". Restarting.", t);
+ }
}
}
- }
- });
+ });
+ }
}
- try {
- jobQueue.put(info);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
+ if ( jobQueue != null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Queuing job {} into queue {}.",
EventUtil.toString(info.event), queueName);
+ }
+ try {
+ jobQueue.put(info);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
+ // don't process this here
+ info = null;
}
}
- // don't process this here
- info = null;
}
// if we still have a job, process it
Modified:
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
(original)
+++
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Thu Feb 11 10:30:14 2010
@@ -90,6 +90,10 @@
load.checkdelay.description = The background loader sleeps this time of
seconds before \
checking the repository for jobs. Default value is 240 seconds.
+max.job.queues.name = Max Job Queues
+max.job.queues.description = The maximum number of job queues (default is 10).
\
+ If this number is exceeded all jobs for a new job queue are put into the main
queue.
+
#
# Event Pool
event.pool.name = Apache Sling Event Thread Pool