Author: cziegeler
Date: Mon Dec 13 16:37:02 2010
New Revision: 1045225
URL: http://svn.apache.org/viewvc?rev=1045225&view=rev
Log:
Several fixes: fix timeout handling and clear statistics.
Refactoring: rename *cleanUp to
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
Mon Dec 13 16:37:02 2010
@@ -211,15 +211,17 @@ public class DefaultJobManager
* is idle for two consecutive clean up calls, it is removed.
* @see java.lang.Runnable#run()
*/
- public void cleanup() {
+ private void cleanup() {
// check for idle queue
// we synchronize to avoid creating a queue which is about to be
removed during cleanup
synchronized ( queuesLock ) {
final Iterator<Map.Entry<String, AbstractJobQueue>> i =
this.queues.entrySet().iterator();
while ( i.hasNext() ) {
final Map.Entry<String, AbstractJobQueue> current = i.next();
+ // clean up
final AbstractJobQueue jbq = current.getValue();
- if ( jbq.isMarkedForCleanUp() ) {
+ jbq.cleanUp();
+ if ( jbq.isMarkedForRemoval() ) {
// close
jbq.close();
// copy statistics
@@ -228,7 +230,7 @@ public class DefaultJobManager
i.remove();
} else {
// mark to be removed during next cycle
- jbq.markForCleanUp();
+ jbq.markForRemoval();
}
}
}
@@ -701,8 +703,8 @@ public class DefaultJobManager
// remove the queue with the old name
this.queues.remove(queue.getName());
// check if we can close or have to rename
- queue.markForCleanUp();
- if ( queue.isMarkedForCleanUp() ) {
+ queue.markForRemoval();
+ if ( queue.isMarkedForRemoval() ) {
// close
queue.close();
// copy statistics
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
Mon Dec 13 16:37:02 2010
@@ -273,5 +273,7 @@ public class StatisticsImpl implements S
this.finishedJobs = 0;
this.failedJobs = 0;
this.cancelledJobs = 0;
+ this.activeJobs = 0;
+ this.queuedJobs = 0;
}
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java
Mon Dec 13 16:37:02 2010
@@ -924,8 +924,10 @@ public class PersistenceHandler implemen
synchronized ( this.backgroundLock ) {
if ( this.running ) {
try {
- final Node eventNode = (Node)
this.backgroundSession.getItem(path);
- this.tryToLoadJob(eventNode, this.unloadedJobs);
+ if ( this.backgroundSession.itemExists(path) ) {
+ final Node eventNode = (Node)
this.backgroundSession.getItem(path);
+ this.tryToLoadJob(eventNode, this.unloadedJobs);
+ }
} catch (RepositoryException re) {
this.ignoreException(re);
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Mon Dec 13 16:37:02 2010
@@ -72,8 +72,8 @@ public abstract class AbstractJobQueue
/** Are we still running? */
protected volatile boolean running;
- /** Are we marked for cleanup */
- private volatile boolean markedForCleanUp = false;
+ /** Are we marked for removal */
+ private volatile boolean markedForRemoval = false;
/** Is the queue currently waiting(sleeping) */
protected volatile boolean isWaiting = false;
@@ -110,7 +110,7 @@ public abstract class AbstractJobQueue
* @see org.apache.sling.event.jobs.Queue#getStateInfo()
*/
public String getStateInfo() {
- return "isWaiting=" + this.isWaiting + ", markedForCleanUp=" +
this.markedForCleanUp + ", suspendedSince=" + this.suspendedSince.longValue();
+ return "isWaiting=" + this.isWaiting + ", markedForRemoval=" +
this.markedForRemoval + ", suspendedSince=" + this.suspendedSince.longValue();
}
/**
@@ -180,7 +180,7 @@ public abstract class AbstractJobQueue
/**
* Periodically cleanup.
*/
- public void cleanup() {
+ public void cleanUp() {
if ( this.running ) {
// check for jobs that were started but never got an aknowledge
final long tooOld = System.currentTimeMillis() -
DEFAULT_WAIT_FOR_ACK_IN_MS;
@@ -215,8 +215,13 @@ public abstract class AbstractJobQueue
process = this.startedJobsLists.remove(info.uniqueId) !=
null;
}
if ( process ) {
- this.logger.info("No acknowledge received for job {}
stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId);
- this.finishedJob(info.event, true);
+ this.decQueued();
+ if ( !info.reschedule() ) {
+ checkForNotify(null);
+ } else {
+ this.logger.info("No acknowledge received for job {}
stored at {}. Requeueing job.", EventUtil.toString(info.event), info.uniqueId);
+ checkForNotify(info);
+ }
}
}
}
@@ -357,27 +362,27 @@ public abstract class AbstractJobQueue
notifyFinished(reprocessInfo);
}
- protected boolean canBeMarkedForCleanUp() {
+ protected boolean canBeMarkedForRemoval() {
return this.isEmpty() && !this.isWaiting;
}
/**
- * Mark this queue for cleanup.
+ * Mark this queue for removal.
*/
- public void markForCleanUp() {
- if ( this.canBeMarkedForCleanUp() ) {
- this.markedForCleanUp = true;
+ public void markForRemoval() {
+ if ( this.canBeMarkedForRemoval() ) {
+ this.markedForRemoval = true;
}
}
/**
- * Check if this queue is marked for cleanup
+ * Check if this queue is marked for removal
*/
- public boolean isMarkedForCleanUp() {
- if ( this.markedForCleanUp ) {
- if ( this.canBeMarkedForCleanUp() ) {
+ public boolean isMarkedForRemoval() {
+ if ( this.markedForRemoval ) {
+ if ( this.canBeMarkedForRemoval() ) {
return true;
}
- this.markedForCleanUp = false;
+ this.markedForRemoval = false;
}
return false;
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
Mon Dec 13 16:37:02 2010
@@ -99,8 +99,8 @@ public abstract class AbstractParallelJo
}
@Override
- protected boolean canBeMarkedForCleanUp() {
- boolean result = super.canBeMarkedForCleanUp();
+ protected boolean canBeMarkedForRemoval() {
+ boolean result = super.canBeMarkedForRemoval();
if ( result ) {
result = this.jobCount == 0;
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1045225&r1=1045224&r2=1045225&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
Mon Dec 13 16:37:02 2010
@@ -65,8 +65,8 @@ public final class TopicRoundRobinJobQue
}
@Override
- protected boolean canBeMarkedForCleanUp() {
- boolean result = super.canBeMarkedForCleanUp();
+ protected boolean canBeMarkedForRemoval() {
+ boolean result = super.canBeMarkedForRemoval();
if ( result ) {
result = !this.isWaitingForNext;
}