Author: cziegeler
Date: Thu Feb 11 10:30:14 2010
New Revision: 908911

URL: http://svn.apache.org/viewvc?rev=908911&view=rev
Log:
SLING-1369 : Make the maximum number of job queues configurable

Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
    
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 Thu Feb 11 10:30:14 2010
@@ -120,12 +120,18 @@
     /** Default nubmer of parallel jobs. */
     private static final long DEFAULT_MAXIMUM_PARALLEL_JOBS = 15;
 
+    /** Default nubmer of job queues. */
+    private static final int DEFAULT_MAXIMUM_JOB_QUEUES = 10;
+
     @Property(longValue=DEFAULT_MAXIMUM_PARALLEL_JOBS)
     private static final String CONFIG_PROPERTY_MAXIMUM_PARALLEL_JOBS = 
"max.parallel.jobs";
 
     @Property(longValue=DEFAULT_WAIT_FOR_ACK)
     private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
 
+    @Property(intValue=DEFAULT_MAXIMUM_JOB_QUEUES)
+    private static final String CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES = 
"max.job.queues";
+
     /** We check every 30 secs by default. */
     private long sleepTime;
 
@@ -180,6 +186,9 @@
     /** Number of jobs to load from the repository on startup in one go. */
     private long maxLoadJobs;
 
+    /** Number of allowed job queues */
+    private int maxJobQueues;
+
     /** Default maximum load jobs. */
     private static final long DEFAULT_MAXIMUM_LOAD_JOBS = 1000;
 
@@ -234,6 +243,7 @@
         this.loadThreshold = 
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_LOAD_THREASHOLD), 
DEFAULT_LOAD_THRESHOLD);
         this.backgroundLoadDelay = 
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY), 
DEFAULT_BACKGROUND_LOAD_DELAY);
         this.backgroundCheckDelay = 
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_BACKGROUND_CHECK_DELAY), 
DEFAULT_BACKGROUND_CHECK_DELAY);
+        this.maxJobQueues = 
OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAXIMUM_JOB_QUEUES), 
DEFAULT_MAXIMUM_JOB_QUEUES);
         this.componentContext = context;
         super.activate(context);
         JOB_THREAD_POOL = this.threadPool;
@@ -555,8 +565,9 @@
         // load unprocessed jobs from repository
         if ( this.running ) {
             logger.info("Apache Sling Job Event Handler started.");
-            logger.debug("Job Handler Configuration: (sleepTime={} secs, 
maxJobRetries={}, waitForAck={} ms, maximumParallelJobs={}, cleanupPeriod={} 
min)",
-                    new Object[] {sleepTime, 
maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod});
+            logger.debug("Job Handler Configuration: (sleepTime={} secs, 
maxJobRetries={}," +
+                    " waitForAck={} ms, maximumParallelJobs={}, 
cleanupPeriod={} min, maxJobQueues={})",
+                    new Object[] {sleepTime, 
maxJobRetries,waitForAckMs,maximumParallelJobs,cleanupPeriod,maxJobQueues});
         } else {
             final ComponentContext ctx = this.componentContext;
             // deactivate
@@ -596,43 +607,51 @@
                 if ( info != null && 
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
                     final String queueName = 
(String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
                     synchronized ( this.jobQueues ) {
-                        if ( logger.isDebugEnabled() ) {
-                            logger.debug("Queuing job {} into queue {}.", 
EventUtil.toString(info.event), queueName);
-                        }
                         BlockingQueue<EventInfo> jobQueue = 
this.jobQueues.get(queueName);
                         if ( jobQueue == null ) {
-                            final boolean orderedQueue = 
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
-                            final JobBlockingQueue jq = new 
JobBlockingQueue(queueName, orderedQueue, this.logger);
-                            jobQueue = jq;
-                            this.jobQueues.put(queueName, jq);
-                            // Start background thread
-                            this.threadPool.execute(new Runnable() {
-
-                                /**
-                                 * @see java.lang.Runnable#run()
-                                 */
-                                public void run() {
-                                    while ( running && !jq.isFinished() ) {
-                                        logger.info("Starting {}job queue {}", 
(orderedQueue ? "ordered " : ""), queueName);
-                                        try {
-                                            runJobQueue(queueName, jq);
-                                        } catch (Throwable t) {
-                                            logger.error("Job queue stopped 
with exception: " + t.getMessage() + ". Restarting.", t);
+                            // check if we have exceeded the maximum number of 
job queues
+                            if ( this.jobQueues.size() >= this.maxJobQueues ) {
+                                this.logger.warn("Unable to create new job 
queue named {} as there are already {} job queues." +
+                                        " Try to increase the maximum number 
of job queues!", queueName, this.jobQueues.size());
+                            } else {
+                                final boolean orderedQueue = 
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+                                final JobBlockingQueue jq = new 
JobBlockingQueue(queueName, orderedQueue, this.logger);
+                                jobQueue = jq;
+                                this.jobQueues.put(queueName, jq);
+                                // Start background thread
+                                this.threadPool.execute(new Runnable() {
+
+                                    /**
+                                     * @see java.lang.Runnable#run()
+                                     */
+                                    public void run() {
+                                        while ( running && !jq.isFinished() ) {
+                                            logger.info("Starting {}job queue 
{}", (orderedQueue ? "ordered " : ""), queueName);
+                                            try {
+                                                runJobQueue(queueName, jq);
+                                            } catch (Throwable t) {
+                                                logger.error("Job queue 
stopped with exception: " + t.getMessage() + ". Restarting.", t);
+                                            }
                                         }
                                     }
-                                }
 
-                            });
+                                });
+                            }
                         }
-                        try {
-                            jobQueue.put(info);
-                        } catch (InterruptedException e) {
-                            // this should never happen
-                            this.ignoreException(e);
+                        if ( jobQueue != null ) {
+                            if ( logger.isDebugEnabled() ) {
+                                logger.debug("Queuing job {} into queue {}.", 
EventUtil.toString(info.event), queueName);
+                            }
+                            try {
+                                jobQueue.put(info);
+                            } catch (InterruptedException e) {
+                                // this should never happen
+                                this.ignoreException(e);
+                            }
+                            // don't process this here
+                            info = null;
                         }
                     }
-                    // don't process this here
-                    info = null;
                 }
 
                 // if we still have a job, process it

Modified: 
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=908911&r1=908910&r2=908911&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
 Thu Feb 11 10:30:14 2010
@@ -90,6 +90,10 @@
 load.checkdelay.description = The background loader sleeps this time of 
seconds before \
  checking the repository for jobs. Default value is 240 seconds.
 
+max.job.queues.name = Max Job Queues
+max.job.queues.description = The maximum number of job queues (default is 10). 
\
+ If this number is exceeded all jobs for a new job queue are put into the main 
queue.
+
 #
 # Event Pool
 event.pool.name = Apache Sling Event Thread Pool 


Reply via email to