Author: cziegeler
Date: Wed Jul 17 07:38:59 2013
New Revision: 1504035
URL: http://svn.apache.org/r1504035
Log:
SLING-2972 : Improve processing performance if job is processed locally
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
Wed Jul 17 07:38:59 2013
@@ -80,7 +80,7 @@ public class BackgroundLoader implements
private final Set<String> unloadedJobs = new HashSet<String>();
/** A local queue for handling new jobs. */
- private final BlockingQueue<String> actionQueue = new
LinkedBlockingQueue<String>();
+ private final BlockingQueue<Object> actionQueue = new
LinkedBlockingQueue<Object>();
/** Boolean to detect the initial start. */
private boolean firstRun = true ;
@@ -205,36 +205,41 @@ public class BackgroundLoader implements
}
// and finally process the action queue
while ( this.isRunning() ) {
- String path = null;
+ Object nextPathOrJob = null;
try {
- path = this.actionQueue.take();
+ nextPathOrJob = this.actionQueue.take();
} catch (final InterruptedException e) {
this.ignoreException(e);
}
- if ( path != null && !END_TOKEN.equals(path) &&
this.isRunning() ) {
- ResourceResolver resolver = null;
- try {
- resolver =
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource resource = resolver.getResource(path);
- if (
ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType()) ) {
- this.logger.debug("Reading local job from {}",
path);
- final JobImpl job =
this.jobManager.readJob(resource);
- if ( job != null ) {
- if ( job.hasReadErrors() ) {
- synchronized ( this.unloadedJobs ) {
- this.unloadedJobs.add(path);
+ if ( nextPathOrJob instanceof JobImpl ) {
+ this.jobManager.process((JobImpl)nextPathOrJob);
+ } else if ( nextPathOrJob instanceof String ) {
+ final String path = (String)nextPathOrJob;
+ if ( !END_TOKEN.equals(path) && this.isRunning() ) {
+ ResourceResolver resolver = null;
+ try {
+ resolver =
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource resource =
resolver.getResource(path);
+ if (
ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType()) ) {
+ this.logger.debug("Reading local job from {}",
path);
+ final JobImpl job =
this.jobManager.readJob(resource);
+ if ( job != null ) {
+ if ( job.hasReadErrors() ) {
+ synchronized ( this.unloadedJobs ) {
+ this.unloadedJobs.add(path);
+ }
+ } else {
+ this.jobManager.process(job);
}
- } else {
- this.jobManager.process(job);
}
}
- }
- } catch ( final LoginException le ) {
- // administrative login should always work
- this.ignoreException(le);
- } finally {
- if ( resolver != null ) {
- resolver.close();
+ } catch ( final LoginException le ) {
+ // administrative login should always work
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
}
}
}
@@ -388,4 +393,19 @@ public class BackgroundLoader implements
}
}
}
+
+ /**
+ * Add a job to the load job queue if the instance is running.
+ */
+ public void addJob(final JobImpl job) {
+ synchronized ( loadLock ) {
+ if ( isRunning() ) {
+ try {
+ this.actionQueue.put(job);
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+ }
+ }
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
Wed Jul 17 07:38:59 2013
@@ -53,7 +53,7 @@ public class JobHandler {
}
public boolean remove() {
- return this.jobManager.remove(this);
+ return this.jobManager.remove(this.job);
}
public void reassign() {
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
Wed Jul 17 07:38:59 2013
@@ -216,7 +216,7 @@ public class JobImpl implements Job {
/**
* Update information about the queue.
*/
- public void updateQueue(final Queue queue) {
+ public void updateQueueInfo(final Queue queue) {
this.properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queue.getName());
this.properties.put(Job.PROPERTY_JOB_RETRIES,
queue.getConfiguration().getMaxRetries());
this.properties.put(Job.PROPERTY_JOB_PRIORITY,
queue.getConfiguration().getPriority());
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
Wed Jul 17 07:38:59 2013
@@ -22,9 +22,11 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -157,6 +159,9 @@ public class JobManagerImpl
/** Statistics per topic. */
private final ConcurrentMap<String, TopicStatistics> topicStatistics = new
ConcurrentHashMap<String, TopicStatistics>();
+ /** Set of paths directly added as jobs - these will be ignored during
observation handling. */
+ private final Set<String> directlyAddedPaths = new HashSet<String>();
+
/**
* Activate this component.
* @param props Configuration properties
@@ -261,7 +266,7 @@ public class JobManagerImpl
// invoke maintenance task
final MaintenanceTask task = this.maintenanceTask;
if ( task != null ) {
- task.run(this.topologyCapabilities, this.queueConfigManager,
this.schedulerRuns);
+ task.run(this.topologyCapabilities, this.queueConfigManager,
this.schedulerRuns - 1);
}
logger.debug("Job manager maintenance: Finished #{}",
this.schedulerRuns);
}
@@ -274,8 +279,6 @@ public class JobManagerImpl
* @param job The job
*/
void process(final JobImpl job) {
- final JobHandler handler = new JobHandler(job, this);
-
// check if we still are able to process this job
final JobConsumer consumer =
this.jobConsumerManager.getConsumer(job.getTopic());
boolean reassign = false;
@@ -285,19 +288,19 @@ public class JobManagerImpl
}
// get the queue configuration
- final QueueInfo queueInfo =
queueConfigManager.getQueueInfo(handler.getJob().getTopic());
+ final QueueInfo queueInfo =
queueConfigManager.getQueueInfo(job.getTopic());
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// Sanity check if queue configuration has changed
if ( config.getType() == QueueConfiguration.Type.DROP ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Dropping job due to configuration of queue {} :
{}", queueInfo.queueName, Utility.toString(handler.getJob()));
+ logger.debug("Dropping job due to configuration of queue {} :
{}", queueInfo.queueName, Utility.toString(job));
}
- handler.remove();
+ this.remove(job);
} else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
if ( !reassign ) {
if ( logger.isDebugEnabled() ) {
- logger.debug("Ignoring job due to configuration of queue
{} : {}", queueInfo.queueName, Utility.toString(handler.getJob()));
+ logger.debug("Ignoring job due to configuration of queue
{} : {}", queueInfo.queueName, Utility.toString(job));
}
}
} else {
@@ -328,8 +331,8 @@ public class JobManagerImpl
}
if ( queue == null ) {
// this is just a sanity check, actually we can
never get here
- logger.warn("Ignoring event due to unknown queue
type of queue {} : {}", queueInfo.queueName,
Utility.toString(handler.getJob()));
- handler.remove();
+ logger.warn("Ignoring event due to unknown queue
type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
+ this.remove(job);
} else {
queues.put(queueInfo.queueName, queue);
((QueuesMBeanImpl)queuesMBean).sendEvent(new
QueueStatusEvent(queue, null));
@@ -340,7 +343,9 @@ public class JobManagerImpl
// and put job
if ( queue != null ) {
- handler.getJob().updateQueue(queue);
+ job.updateQueueInfo(queue);
+ final JobHandler handler = new JobHandler(job, this);
+
queue.process(handler);
}
}
@@ -443,6 +448,11 @@ public class JobManagerImpl
final String rt = (String)
event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
if ( (rt == null || ResourceHelper.RESOURCE_TYPE_JOB.equals(rt)) &&
this.configuration.isLocalJob(path) ) {
+ synchronized ( this.directlyAddedPaths ) {
+ if ( directlyAddedPaths.remove(path) ) {
+ return;
+ }
+ }
this.backgroundLoader.loadJob(path);
}
} else if (
ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
@@ -1066,13 +1076,13 @@ public class JobManagerImpl
* @param info
* @return
*/
- public boolean remove(final JobHandler info) {
+ public boolean remove(final JobImpl job) {
ResourceResolver resolver = null;
try {
resolver =
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- final Resource jobResource =
resolver.getResource(info.getJob().getResourcePath());
+ final Resource jobResource =
resolver.getResource(job.getResourcePath());
if ( jobResource != null ) {
- Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_CANCELLED, info.getJob(), null);
+ Utility.sendNotification(this.eventAdmin,
JobUtil.TOPIC_JOB_CANCELLED, job, null);
try {
resolver.delete(jobResource);
resolver.commit();
@@ -1176,7 +1186,13 @@ public class JobManagerImpl
return hasLock;
}
-
+ /**
+ * Persist the job in the resource tree
+ * @param jobTopic The required job topic
+ * @param jobName The optional job name
+ * @param jobProperties The optional job properties
+ * @return The persisted job or <code>null</code>.
+ */
private Job addJobInteral(final String jobTopic, final String jobName,
final Map<String, Object> jobProperties) {
final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic);
@@ -1206,11 +1222,17 @@ public class JobManagerImpl
try {
resolver =
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- return this.writeJob(resolver,
+ final JobImpl job = this.writeJob(resolver,
jobTopic,
jobName,
jobProperties,
info);
+ if ( job != null ) {
+ if ( configuration.isLocalJob(job.getResourcePath()) )
{
+ this.backgroundLoader.addJob(job);
+ }
+ }
+ return job;
} catch (final PersistenceException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during persisting new job '"
+ Utility.toString(jobTopic, jobName, jobProperties) + "'", re);
@@ -1234,7 +1256,7 @@ public class JobManagerImpl
* @param info The queue information (queue name etc.)
* @throws PersistenceException
*/
- private Job writeJob(final ResourceResolver resolver,
+ private JobImpl writeJob(final ResourceResolver resolver,
final String jobTopic,
final String jobName,
final Map<String, Object> jobProperties,
@@ -1278,6 +1300,9 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Storing new job {} at {}", properties, path);
}
+ synchronized ( this.directlyAddedPaths ) {
+ this.directlyAddedPaths.add(path);
+ }
ResourceHelper.getOrCreateResource(resolver,
path,
properties);
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1504035&r1=1504034&r2=1504035&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Wed Jul 17 07:38:59 2013
@@ -437,11 +437,11 @@ public abstract class AbstractJobQueue
/**
* Add a new job to the queue.
*/
- public void process(final JobHandler event) {
+ public void process(final JobHandler handler) {
this.closeMarker.set(false);
- this.put(event);
- event.queued = System.currentTimeMillis();
+ handler.queued = System.currentTimeMillis();
this.incQueued();
+ this.put(handler);
}
/**