Author: cziegeler
Date: Wed Feb 10 16:34:47 2010
New Revision: 908572
URL: http://svn.apache.org/viewvc?rev=908572&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
Add more debug logging.
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=908572&r1=908571&r2=908572&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
Wed Feb 10 16:34:47 2010
@@ -156,6 +156,7 @@
public static final String TOPIC_JOB_FINISHED =
"org/apache/sling/event/notification/job/FINISHED";
/** Asynchronous notification event when a job failed.
+ * If a job execution fails, it is rescheduled for another try.
* The property {...@link #PROPERTY_NOTIFICATION_JOB} contains the job
event and the
* property {...@link org.osgi.service.event.EventConstants#TIMESTAMP}
contains the
* timestamp of the event (as a Long).
@@ -163,6 +164,7 @@
public static final String TOPIC_JOB_FAILED =
"org/apache/sling/event/notification/job/FAILED";
/** Asynchronous notification event when a job is cancelled.
+ * If a job execution is cancelled it is not rescheduled.
* The property {...@link #PROPERTY_NOTIFICATION_JOB} contains the job
event and the
* property {...@link org.osgi.service.event.EventConstants#TIMESTAMP}
contains the
* timestamp of the event (as a Long).
@@ -661,7 +663,9 @@
return "<null>";
}
final StringBuilder buffer = new StringBuilder(e.getClass().getName());
- buffer.append(" [topic=");
+ buffer.append('(');
+ buffer.append(e.hashCode());
+ buffer.append(") [topic=");
buffer.append(e.getTopic());
buffer.append(", properties=");
final String[] names = e.getPropertyNames();
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908572&r1=908571&r2=908572&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Wed Feb 10 16:34:47 2010
@@ -404,7 +404,7 @@
process = this.processingEventsList.remove(info.nodePath)
!= null;
}
if ( process ) {
- this.logger.info("No acknowledge received for job {}
stored at {}. Requeueing job.", info.event, info.nodePath);
+ this.logger.info("No acknowledge received for job {}
stored at {}. Requeueing job.", EventUtil.toString(info.event), info.nodePath);
this.finishedJob(info.event, info.nodePath, true);
}
}
@@ -451,7 +451,9 @@
this.ignoreException(e);
}
if ( event != null && this.running ) {
- logger.debug("Persisting job {}", event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Persisting job {}",
EventUtil.toString(event));
+ }
final EventInfo info = new EventInfo();
info.event = event;
final String jobId =
(String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
@@ -512,7 +514,7 @@
}
} catch (RepositoryException re ) {
// something went wrong, so let's log it
- this.logger.error("Exception during writing new
job '" + event + "' to repository at " + nodePath, re);
+ this.logger.error("Exception during writing new
job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
}
}
}
@@ -570,13 +572,17 @@
}
if ( info != null && this.running ) {
- logger.debug("Processing new job {}", info.event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Received new job {}",
EventUtil.toString(info.event));
+ }
// check for local only jobs and remove them from the queue if
they're meant
// for another application node
final String appId =
(String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL)
!= null
&& appId != null && !this.applicationId.equals(appId) ) {
- logger.debug("Discarding job {} : local job for a
different application node.", info.event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Discarding job {} : local job for a
different application node.", EventUtil.toString(info.event));
+ }
info = null;
}
@@ -584,7 +590,9 @@
if ( info != null &&
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
final String queueName =
(String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
synchronized ( this.jobQueues ) {
- logger.debug("Queuing job {} into queue {}.",
info.event, queueName);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Queuing job {} into queue {}.",
EventUtil.toString(info.event), queueName);
+ }
BlockingQueue<EventInfo> jobQueue =
this.jobQueues.get(queueName);
if ( jobQueue == null ) {
final boolean orderedQueue =
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
@@ -699,7 +707,9 @@
boolean putback = false;
boolean wait = false;
synchronized (this.backgroundLock) {
- logger.debug("Executing job {}.", info.event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Executing job {}.",
EventUtil.toString(info.event));
+ }
try {
this.backgroundSession.refresh(false);
// check if the node still exists
@@ -738,7 +748,9 @@
}
// check number of parallel jobs for main queue
if ( process && jobQueue == null && this.parallelJobCount
>= this.maximumParallelJobs ) {
- logger.debug("Rescheduling job {} - maximum parallel
job count of {} reached!", info.event, this.maximumParallelJobs);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Rescheduling job {} - maximum
parallel job count of {} reached!", EventUtil.toString(info.event),
this.maximumParallelJobs);
+ }
process = false;
wait = true;
}
@@ -826,12 +838,16 @@
* @see
org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
*/
public void handleEvent(final Event event) {
- logger.debug("Receiving event {}", event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Receiving event {}", EventUtil.toString(event));
+ }
// we ignore remote job events
if ( EventUtil.isLocal(event) ) {
// check for bundle event
if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
- logger.debug("Handling local job {}", event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Handling local job {}",
EventUtil.toString(event));
+ }
// job event
final String jobTopic =
(String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
@@ -956,7 +972,9 @@
final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
final boolean parallelProcessing = parInfo.processParallel;
final String jobTopic =
(String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- logger.debug("Starting job {}", event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Starting job {}", EventUtil.toString(event));
+ }
boolean unlock = true;
try {
if ( isMainQueue ) {
@@ -1231,6 +1249,9 @@
* @see
org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
String, boolean)
*/
public boolean finishedJob(Event job, String eventNodePath, boolean
shouldReschedule) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Received finish for job {},
shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
+ }
// let's remove the event from our processing list
// this is just a sanity check, as usually the job should have been
// removed during sendAcknowledge.
@@ -1259,11 +1280,20 @@
newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT,
retryCount);
newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries);
job = new Event(job.getTopic(), newProperties);
- this.sendNotification(EventUtil.TOPIC_JOB_CANCELLED, job);
- } else {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Failed job {}",
EventUtil.toString(job));
+ }
this.sendNotification(EventUtil.TOPIC_JOB_FAILED, job);
+ } else {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Cancelled job {}",
EventUtil.toString(job));
+ }
+ this.sendNotification(EventUtil.TOPIC_JOB_CANCELLED, job);
}
} else {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Finished job {}", EventUtil.toString(job));
+ }
this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job);
}
final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
@@ -1401,7 +1431,9 @@
}
private void putBackIntoMainQueue(final EventInfo info, final boolean
useSleepTime) {
- logger.debug("Putting job {} back into the queue.", info.event);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug("Putting job {} back into the queue.",
EventUtil.toString(info.event));
+ }
final Date fireDate = new Date();
if ( useSleepTime ) {
fireDate.setTime(System.currentTimeMillis() + this.sleepTime *
1000);