Author: cziegeler
Date: Thu Jun 13 17:48:04 2013
New Revision: 1492779
URL: http://svn.apache.org/r1492779
Log:
SLING-2906 : Queue might be outdated and closed while still processing
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -103,6 +103,11 @@ public abstract class AbstractJobQueue
/** Flag for outdated. */
private final AtomicBoolean isOutdated = new AtomicBoolean(false);
+ /** Marker flag if the queue is waiting for another element (= empty) */
+ protected boolean isWaitingForNext = false;
+
+ private final AtomicBoolean closeMarker = new AtomicBoolean(false);
+
/**
* Start this queue
* @param name The queue name
@@ -129,6 +134,7 @@ public abstract class AbstractJobQueue
synchronized ( this.suspendLock ) {
return "isWaiting=" + this.isWaiting +
", suspendedSince=" + this.suspendedSince +
+ ", isWaitingForNext=" + this.isWaitingForNext +
", asyncJobs=" + this.asyncCounter.get();
}
}
@@ -197,14 +203,17 @@ public abstract class AbstractJobQueue
this.resume();
// check if possible
if ( this.canBeClosed() ) {
- this.close();
- return true;
+ if ( this.closeMarker.get() ) {
+ this.close();
+ return true;
+ }
+ this.closeMarker.set(true);
}
return false;
}
- protected boolean canBeClosed() {
- return this.isEmpty() && !this.isWaiting && !this.isSuspended() &&
this.asyncCounter.get() == 0;
+ private boolean canBeClosed() {
+ return this.isEmpty() && !this.isWaiting && !this.isSuspended() &&
this.asyncCounter.get() == 0 && this.isWaitingForNext;
}
/**
@@ -426,6 +435,7 @@ public abstract class AbstractJobQueue
* Add a new job to the queue.
*/
public void process(final JobHandler event) {
+ this.closeMarker.set(false);
this.put(event);
event.queued = System.currentTimeMillis();
this.incQueued();
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -46,8 +46,6 @@ public final class OrderedJobQueue exten
/** Lock and status object for handling the sleep phase. */
private final SleepLock sleepLock = new SleepLock();
- private boolean isWaitingForNext = false;
-
/** The queue - we use a set which is sorted by job creation date. */
private final Set<JobHandler> queue = new TreeSet<JobHandler>(new
Comparator<JobHandler>() {
@@ -77,16 +75,7 @@ public final class OrderedJobQueue exten
@Override
public String getStateInfo() {
- return super.getStateInfo() + ", isSleepingUntil=" +
this.sleepLock.sleepingSince + ", isWaitingForNext=" + this.isWaitingForNext;
- }
-
- @Override
- protected boolean canBeClosed() {
- boolean result = super.canBeClosed();
- if ( result ) {
- result = this.isWaitingForNext;
- }
- return result;
+ return super.getStateInfo() + ", isSleepingUntil=" +
this.sleepLock.sleepingSince;
}
@Override
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -39,8 +39,6 @@ public final class ParallelJobQueue exte
/** The queue. */
private final BlockingQueue<JobHandler> queue = new
LinkedBlockingQueue<JobHandler>();
- private boolean isWaitingForNext = false;
-
public ParallelJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
@@ -50,20 +48,6 @@ public final class ParallelJobQueue exte
}
@Override
- public String getStateInfo() {
- return super.getStateInfo() + ", isWaitingForNext=" +
this.isWaitingForNext;
- }
-
- @Override
- protected boolean canBeClosed() {
- boolean result = super.canBeClosed();
- if ( result ) {
- result = this.isWaitingForNext;
- }
- return result;
- }
-
- @Override
protected void put(final JobHandler event) {
try {
this.isWaitingForNext = false;
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java?rev=1492779&r1=1492778&r2=1492779&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
Thu Jun 13 17:48:04 2013
@@ -50,8 +50,6 @@ public final class TopicRoundRobinJobQue
/** Event count. */
private int eventCount;
- private boolean isWaitingForNext = false;
-
public TopicRoundRobinJobQueue(final String name,
final InternalQueueConfiguration config,
final JobConsumerManager jobConsumerManager,
@@ -62,16 +60,7 @@ public final class TopicRoundRobinJobQue
@Override
public String getStateInfo() {
- return super.getStateInfo() + ", eventCount=" + this.eventCount + ",
isWaitingForNext=" + this.isWaitingForNext;
- }
-
- @Override
- protected boolean canBeClosed() {
- boolean result = super.canBeClosed();
- if ( result ) {
- result = this.isWaitingForNext;
- }
- return result;
+ return super.getStateInfo() + ", eventCount=" + this.eventCount;
}
@Override