Author: cziegeler
Date: Thu Mar 19 07:32:42 2015
New Revision: 1667673
URL: http://svn.apache.org/r1667673
Log:
SLING-4481 : Reduce the number of controller threads for queue
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1667673&r1=1667672&r2=1667673&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
Thu Mar 19 07:32:42 2015
@@ -502,12 +502,14 @@ public class JobQueueImpl
}
private static final class RescheduleInfo {
- public boolean reschedule = false;
- public long processingTime;
+ public boolean reschedule = false;
+ public long processingTime;
+ public Job.JobState state;
}
private RescheduleInfo handleReschedule(final JobHandler handler, final
Job.JobState resultState) {
final RescheduleInfo info = new RescheduleInfo();
+ info.state = resultState;
switch ( resultState ) {
case SUCCEEDED : // job is finished
if ( this.logger.isDebugEnabled() ) {
@@ -543,6 +545,9 @@ public class JobQueueImpl
break;
}
+ if ( info.state == Job.JobState.QUEUED && !info.reschedule ) {
+ info.state = Job.JobState.GIVEN_UP;
+ }
return info;
}
@@ -550,11 +555,9 @@ public class JobQueueImpl
* Handle job finish and determine whether to reschedule or cancel the job
*/
private boolean finishedJob(final String jobId,
- Job.JobState resultState,
+ final Job.JobState resultState,
final boolean isAsync) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Received finish for job {}, resultState={}",
jobId, resultState);
- }
+ this.logger.debug("Received finish for job {}, resultState={}", jobId,
resultState);
// get job handler
final JobHandler handler;
@@ -569,22 +572,17 @@ public class JobQueueImpl
}
if ( handler == null ) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("This job has never been started by this
queue: {}", jobId);
- }
+ this.logger.warn("This job has never been started by this queue:
{}", jobId);
return false;
}
// handle the reschedule, a new job might be returned with updated
reschedule info!
final RescheduleInfo rescheduleInfo = this.handleReschedule(handler,
resultState);
- if ( resultState == Job.JobState.QUEUED && !rescheduleInfo.reschedule
) {
- resultState = Job.JobState.GIVEN_UP;
- }
if ( !rescheduleInfo.reschedule ) {
// we keep cancelled jobs and succeeded jobs if the queue is
configured like this.
- final boolean keepJobs = resultState != Job.JobState.SUCCEEDED ||
this.configuration.isKeepJobs();
- handler.finished(resultState, keepJobs,
rescheduleInfo.processingTime);
+ final boolean keepJobs = rescheduleInfo.state !=
Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
+ handler.finished(rescheduleInfo.state, keepJobs,
rescheduleInfo.processingTime);
} else {
this.reschedule(handler);
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1667673&r1=1667672&r2=1667673&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
Thu Mar 19 07:32:42 2015
@@ -124,7 +124,7 @@ public class QueueManager
private final AtomicBoolean isActive = new AtomicBoolean(false);
/** The queue services. */
- private QueueServices queueServices;
+ private volatile QueueServices queueServices;
/**
* Activate this component.