Author: cziegeler
Date: Wed Jul 17 07:38:59 2013
New Revision: 1504035

URL: http://svn.apache.org/r1504035
Log:
SLING-2972 :  Improve processing performance if job is processed locally 

Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
 Wed Jul 17 07:38:59 2013
@@ -80,7 +80,7 @@ public class BackgroundLoader implements
     private final Set<String> unloadedJobs = new HashSet<String>();
 
     /** A local queue for handling new jobs. */
-    private final BlockingQueue<String> actionQueue = new 
LinkedBlockingQueue<String>();
+    private final BlockingQueue<Object> actionQueue = new 
LinkedBlockingQueue<Object>();
 
     /** Boolean to detect the initial start. */
     private boolean firstRun = true ;
@@ -205,36 +205,41 @@ public class BackgroundLoader implements
             }
             // and finally process the action queue
             while ( this.isRunning() ) {
-                String path = null;
+                Object nextPathOrJob = null;
                 try {
-                    path = this.actionQueue.take();
+                    nextPathOrJob = this.actionQueue.take();
                 } catch (final InterruptedException e) {
                     this.ignoreException(e);
                 }
-                if ( path != null && !END_TOKEN.equals(path) && 
this.isRunning() ) {
-                    ResourceResolver resolver = null;
-                    try {
-                        resolver = 
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-                        final Resource resource = resolver.getResource(path);
-                        if ( 
ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType()) ) {
-                            this.logger.debug("Reading local job from {}", 
path);
-                            final JobImpl job = 
this.jobManager.readJob(resource);
-                            if ( job != null ) {
-                                if ( job.hasReadErrors() ) {
-                                    synchronized ( this.unloadedJobs ) {
-                                        this.unloadedJobs.add(path);
+                if ( nextPathOrJob instanceof JobImpl ) {
+                    this.jobManager.process((JobImpl)nextPathOrJob);
+                } else if ( nextPathOrJob instanceof String ) {
+                    final String path = (String)nextPathOrJob;
+                    if ( !END_TOKEN.equals(path) && this.isRunning() ) {
+                        ResourceResolver resolver = null;
+                        try {
+                            resolver = 
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                            final Resource resource = 
resolver.getResource(path);
+                            if ( 
ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType()) ) {
+                                this.logger.debug("Reading local job from {}", 
path);
+                                final JobImpl job = 
this.jobManager.readJob(resource);
+                                if ( job != null ) {
+                                    if ( job.hasReadErrors() ) {
+                                        synchronized ( this.unloadedJobs ) {
+                                            this.unloadedJobs.add(path);
+                                        }
+                                    } else {
+                                        this.jobManager.process(job);
                                     }
-                                } else {
-                                    this.jobManager.process(job);
                                 }
                             }
-                        }
-                    } catch ( final LoginException le ) {
-                        // administrative login should always work
-                        this.ignoreException(le);
-                    } finally {
-                        if ( resolver != null ) {
-                            resolver.close();
+                        } catch ( final LoginException le ) {
+                            // administrative login should always work
+                            this.ignoreException(le);
+                        } finally {
+                            if ( resolver != null ) {
+                                resolver.close();
+                            }
                         }
                     }
                 }
@@ -388,4 +393,19 @@ public class BackgroundLoader implements
             }
         }
     }
+
+    /**
+     * Add a job to the load job queue if the instance is running.
+     */
+    public void addJob(final JobImpl job) {
+        synchronized ( loadLock ) {
+            if ( isRunning() ) {
+                try {
+                    this.actionQueue.put(job);
+                } catch (final InterruptedException e) {
+                    this.ignoreException(e);
+                }
+            }
+        }
+    }
 }

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
 Wed Jul 17 07:38:59 2013
@@ -53,7 +53,7 @@ public class JobHandler {
     }
 
     public boolean remove() {
-        return this.jobManager.remove(this);
+        return this.jobManager.remove(this.job);
     }
 
     public void reassign() {

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
 Wed Jul 17 07:38:59 2013
@@ -216,7 +216,7 @@ public class JobImpl implements Job {
     /**
      * Update information about the queue.
      */
-    public void updateQueue(final Queue queue) {
+    public void updateQueueInfo(final Queue queue) {
         this.properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queue.getName());
         this.properties.put(Job.PROPERTY_JOB_RETRIES, 
queue.getConfiguration().getMaxRetries());
         this.properties.put(Job.PROPERTY_JOB_PRIORITY, 
queue.getConfiguration().getPriority());

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
 Wed Jul 17 07:38:59 2013
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -157,6 +159,9 @@ public class JobManagerImpl
     /** Statistics per topic. */
     private final ConcurrentMap<String, TopicStatistics> topicStatistics = new 
ConcurrentHashMap<String, TopicStatistics>();
 
+    /** Set of paths directly added as jobs - these will be ignored during 
observation handling. */
+    private final Set<String> directlyAddedPaths = new HashSet<String>();
+
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -261,7 +266,7 @@ public class JobManagerImpl
         // invoke maintenance task
         final MaintenanceTask task = this.maintenanceTask;
         if ( task != null ) {
-            task.run(this.topologyCapabilities, this.queueConfigManager, 
this.schedulerRuns);
+            task.run(this.topologyCapabilities, this.queueConfigManager, 
this.schedulerRuns - 1);
         }
         logger.debug("Job manager maintenance: Finished #{}", 
this.schedulerRuns);
     }
@@ -274,8 +279,6 @@ public class JobManagerImpl
      * @param job The job
      */
     void process(final JobImpl job) {
-        final JobHandler handler = new JobHandler(job, this);
-
         // check if we still are able to process this job
         final JobConsumer consumer = 
this.jobConsumerManager.getConsumer(job.getTopic());
         boolean reassign = false;
@@ -285,19 +288,19 @@ public class JobManagerImpl
         }
 
         // get the queue configuration
-        final QueueInfo queueInfo = 
queueConfigManager.getQueueInfo(handler.getJob().getTopic());
+        final QueueInfo queueInfo = 
queueConfigManager.getQueueInfo(job.getTopic());
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
 
         // Sanity check if queue configuration has changed
         if ( config.getType() == QueueConfiguration.Type.DROP ) {
             if ( logger.isDebugEnabled() ) {
-                logger.debug("Dropping job due to configuration of queue {} : 
{}", queueInfo.queueName, Utility.toString(handler.getJob()));
+                logger.debug("Dropping job due to configuration of queue {} : 
{}", queueInfo.queueName, Utility.toString(job));
             }
-            handler.remove();
+            this.remove(job);
         } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
             if ( !reassign ) {
                 if ( logger.isDebugEnabled() ) {
-                    logger.debug("Ignoring job due to configuration of queue 
{} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+                    logger.debug("Ignoring job due to configuration of queue 
{} : {}", queueInfo.queueName, Utility.toString(job));
                 }
             }
         } else {
@@ -328,8 +331,8 @@ public class JobManagerImpl
                         }
                         if ( queue == null ) {
                             // this is just a sanity check, actually we can 
never get here
-                            logger.warn("Ignoring event due to unknown queue 
type of queue {} : {}", queueInfo.queueName, 
Utility.toString(handler.getJob()));
-                            handler.remove();
+                            logger.warn("Ignoring event due to unknown queue 
type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
+                            this.remove(job);
                         } else {
                             queues.put(queueInfo.queueName, queue);
                             ((QueuesMBeanImpl)queuesMBean).sendEvent(new 
QueueStatusEvent(queue, null));
@@ -340,7 +343,9 @@ public class JobManagerImpl
 
                 // and put job
                 if ( queue != null ) {
-                    handler.getJob().updateQueue(queue);
+                    job.updateQueueInfo(queue);
+                    final JobHandler handler = new JobHandler(job, this);
+
                     queue.process(handler);
                 }
             }
@@ -443,6 +448,11 @@ public class JobManagerImpl
             final String rt = (String) 
event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
             if ( (rt == null || ResourceHelper.RESOURCE_TYPE_JOB.equals(rt)) &&
                  this.configuration.isLocalJob(path) ) {
+                synchronized ( this.directlyAddedPaths ) {
+                    if ( directlyAddedPaths.remove(path) ) {
+                        return;
+                    }
+                }
                 this.backgroundLoader.loadJob(path);
             }
         } else if ( 
ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
@@ -1066,13 +1076,13 @@ public class JobManagerImpl
      * @param info
      * @return
      */
-    public boolean remove(final JobHandler info) {
+    public boolean remove(final JobImpl job) {
         ResourceResolver resolver = null;
         try {
             resolver = 
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-            final Resource jobResource = 
resolver.getResource(info.getJob().getResourcePath());
+            final Resource jobResource = 
resolver.getResource(job.getResourcePath());
             if ( jobResource != null ) {
-                Utility.sendNotification(this.eventAdmin, 
JobUtil.TOPIC_JOB_CANCELLED, info.getJob(), null);
+                Utility.sendNotification(this.eventAdmin, 
JobUtil.TOPIC_JOB_CANCELLED, job, null);
                 try {
                     resolver.delete(jobResource);
                     resolver.commit();
@@ -1176,7 +1186,13 @@ public class JobManagerImpl
         return hasLock;
     }
 
-
+    /**
+     * Persist the job in the resource tree
+     * @param jobTopic The required job topic
+     * @param jobName The optional job name
+     * @param jobProperties The optional job properties
+     * @return The persisted job or <code>null</code>.
+     */
 
     private Job addJobInteral(final String jobTopic, final String jobName, 
final Map<String, Object> jobProperties) {
         final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic);
@@ -1206,11 +1222,17 @@ public class JobManagerImpl
                 try {
                     resolver = 
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
 
-                    return this.writeJob(resolver,
+                    final JobImpl job = this.writeJob(resolver,
                             jobTopic,
                             jobName,
                             jobProperties,
                             info);
+                    if ( job != null ) {
+                        if ( configuration.isLocalJob(job.getResourcePath()) ) 
{
+                            this.backgroundLoader.addJob(job);
+                        }
+                    }
+                    return job;
                 } catch (final PersistenceException re ) {
                     // something went wrong, so let's log it
                     this.logger.error("Exception during persisting new job '" 
+ Utility.toString(jobTopic, jobName, jobProperties) + "'", re);
@@ -1234,7 +1256,7 @@ public class JobManagerImpl
      * @param info The queue information (queue name etc.)
      * @throws PersistenceException
      */
-    private Job writeJob(final ResourceResolver resolver,
+    private JobImpl writeJob(final ResourceResolver resolver,
             final String jobTopic,
             final String jobName,
             final Map<String, Object> jobProperties,
@@ -1278,6 +1300,9 @@ public class JobManagerImpl
         if ( logger.isDebugEnabled() ) {
             logger.debug("Storing new job {} at {}", properties, path);
         }
+        synchronized ( this.directlyAddedPaths ) {
+            this.directlyAddedPaths.add(path);
+        }
         ResourceHelper.getOrCreateResource(resolver,
                 path,
                 properties);

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 Wed Jul 17 07:38:59 2013
@@ -437,11 +437,11 @@ public abstract class AbstractJobQueue
     /**
      * Add a new job to the queue.
      */
-    public void process(final JobHandler event) {
+    public void process(final JobHandler handler) {
         this.closeMarker.set(false);
-        this.put(event);
-        event.queued = System.currentTimeMillis();
+        handler.queued = System.currentTimeMillis();
         this.incQueued();
+        this.put(handler);
     }
 
     /**


Reply via email to