Author: cziegeler
Date: Fri Mar 26 11:23:04 2010
New Revision: 927774
URL: http://svn.apache.org/viewvc?rev=927774&view=rev
Log:
SLING-1467 : Cancelling a job does not work
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java?rev=927774&r1=927773&r2=927774&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
Fri Mar 26 11:23:04 2010
@@ -83,17 +83,23 @@ public interface JobStatusProvider {
/**
* Cancel this job.
+ * Cancelling a job might fail if the job is currently in processing.
* @param jobId The unique identifer as found in the property {...@link
#PROPERTY_EVENT_ID}.
+ * @return <code>true</code> if the job could be cancelled or does not
exist anymore.
+ * <code>false</code> otherwise.
*/
- void cancelJob(String jobId);
+ boolean cancelJob(String jobId);
/**
* Cancel this job.
+ * Cancelling a job might fail if the job is currently in processing.
* This method can be used if the topic and the provided job id is known.
* @param topic The job topic as put into the property {...@link
EventUtil#PROPERTY_JOB_TOPIC}.
* @param jobId The unique identifer as put into the property {...@link
EventUtil#PROPERTY_JOB_ID}.
+ * @return <code>true</code> if the job could be cancelled or does not
exist anymore.
+ * <code>false</code> otherwise.
*/
- void cancelJob(String topic, String jobId);
+ boolean cancelJob(String topic, String jobId);
/**
* Wake up the named job queue.
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=927774&r1=927773&r2=927774&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Fri Mar 26 11:23:04 2010
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import javax.jcr.Item;
import javax.jcr.ItemExistsException;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
@@ -1164,22 +1163,28 @@ public class JobEventHandler
long count = 0;
while ( result.hasNext() && count < maxLoad ) {
final Node eventNode = result.nextNode();
- eventCreated =
eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
- if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
- count++;
+ final String propPath = eventNode.getPath() + '/' +
EventHelper.NODE_PROPERTY_CREATED;
+ if ( this.backgroundSession.itemExists(propPath) ) {
+ eventCreated =
eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+ if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+ count++;
+ }
}
}
// now we have to add all jobs with the same created time!
boolean done = false;
while ( result.hasNext() && !done ) {
final Node eventNode = result.nextNode();
- final long created =
eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
- if ( created == eventCreated ) {
- if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
- count++;
+ final String propPath = eventNode.getPath() + '/' +
EventHelper.NODE_PROPERTY_CREATED;
+ if ( this.backgroundSession.itemExists(propPath) ) {
+ final long created =
eventNode.getProperty(EventHelper.NODE_PROPERTY_CREATED).getLong();
+ if ( created == eventCreated ) {
+ if ( tryToLoadJob(eventNode, this.unloadedJobs) ) {
+ count++;
+ }
+ } else {
+ done = true;
}
- } else {
- done = true;
}
}
// have we processed all jobs?
@@ -1611,34 +1616,46 @@ public class JobEventHandler
/**
* @see
org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String,
java.lang.String)
*/
- public void cancelJob(String topic, String jobId) {
+ public boolean cancelJob(String topic, String jobId) {
if ( jobId != null && topic != null ) {
- this.cancelJob(JobUtil.getUniquePath(topic, jobId));
+ try {
+ final String uniqueJobId = this.getWriterRootNode().getPath()
+ '/' + JobUtil.getUniquePath(topic, jobId);
+ return this.cancelJob(uniqueJobId);
+ } catch (RepositoryException e) {
+ // this only happens if getPath() throws which really should
not happen
+ this.ignoreException(e);
+ }
}
+ return true;
}
/**
* @see
org.apache.sling.event.JobStatusProvider#cancelJob(java.lang.String)
*/
- public void cancelJob(String jobId) {
+ public boolean cancelJob(String jobId) {
if ( jobId != null ) {
- synchronized ( this.writeLock ) {
+ synchronized ( this.backgroundLock ) {
try {
- this.writerSession.refresh(false);
+ this.backgroundSession.refresh(false);
} catch (RepositoryException e) {
this.ignoreException(e);
}
try {
- if ( this.writerSession.itemExists(jobId) ) {
- final Item item = this.writerSession.getItem(jobId);
- item.remove();
- this.writerSession.save();
+ if ( this.backgroundSession.itemExists(jobId) ) {
+ final Node eventNode = (Node)
this.backgroundSession.getItem(jobId);
+ if ( eventNode.isLocked() ) {
+ this.logger.info("Attempted to cancel a running
job at {}", jobId);
+ return false;
+ }
+ eventNode.remove();
+ this.backgroundSession.save();
}
} catch (RepositoryException e) {
this.logger.error("Error during cancelling job at " +
jobId, e);
}
}
}
+ return true;
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java?rev=927774&r1=927773&r2=927774&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
Fri Mar 26 11:23:04 2010
@@ -30,6 +30,7 @@ import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.apache.sling.api.services.SlingSettingsService;
+import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPoolConfig;
import org.apache.sling.jcr.api.SlingRepository;
@@ -75,7 +76,12 @@ public abstract class AbstractRepository
@org.junit.Before public void setup() throws Exception {
this.handler.repository = RepositoryTestUtil.getSlingRepository();
+ this.handler.classLoaderManager = new DynamicClassLoaderManager() {
+ public ClassLoader getDynamicClassLoader() {
+ return this.getClass().getClassLoader();
+ }
+ };
// the event admin
final EventAdmin eventAdmin = this.getMockery().mock(EventAdmin.class);
this.handler.eventAdmin = eventAdmin;
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java?rev=927774&r1=927773&r2=927774&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/JobEventHandlerTest.java
Fri Mar 26 11:23:04 2010
@@ -142,6 +142,44 @@ public class JobEventHandlerTest extends
}
/**
+ * Test cancelling a job
+ * The job execution always fails
+ */
+ @org.junit.Test public void testCancelJob() throws Exception {
+ final JobEventHandler jeh = (JobEventHandler)this.handler;
+ final Barrier cb = new Barrier(2);
+ jeh.eventAdmin = new SimpleEventAdmin(new String[] {"sling/test"},
+ new EventHandler[] {
+ new EventHandler() {
+ public void handleEvent(Event event) {
+ EventUtil.acknowledgeJob(event);
+ cb.block();
+ try {
+ Thread.sleep(400);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ EventUtil.rescheduleJob(event);
+ }
+
+ }
+ });
+ jeh.handleEvent(getJobEvent(null, "myid", null));
+ assertEquals(1, jeh.getAllJobs("sling/test").size());
+ cb.block();
+ // job is currently sleeping, therefore cancel fails
+ assertFalse(jeh.cancelJob("sling/test", "myid"));
+ try {
+ Thread.sleep(800);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ // the job is now in the queue again
+ assertTrue(jeh.cancelJob("sling/test", "myid"));
+ assertEquals(0, jeh.getAllJobs("sling/test").size());
+ }
+
+ /**
* Reschedule test.
* The job is rescheduled two times before it fails.
*/