Author: cziegeler
Date: Fri Jan  8 16:51:13 2016
New Revision: 1723760

URL: http://svn.apache.org/viewvc?rev=1723760&view=rev
Log:
SLING-5417 : Jobs are only partially persisted

Modified:
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
 Fri Jan  8 16:51:13 2016
@@ -28,8 +28,11 @@ import static org.ops4j.pax.exam.CoreOpt
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Dictionary;
 import java.util.Hashtable;
+import java.util.List;
 
 import javax.inject.Inject;
 
@@ -64,6 +67,8 @@ public abstract class AbstractJobHandlin
 
     private static final String DEFAULT_BUILD_DIR = "target";
 
+    private static final String PORT_CONFIG = "org.osgi.service.http.port";
+
     protected static final int DEFAULT_TEST_TIMEOUT = 1000*60*5;
 
     @Inject
@@ -75,7 +80,7 @@ public abstract class AbstractJobHandlin
     @Inject
     protected BundleContext bc;
 
-    private static final String PORT_CONFIG = "org.osgi.service.http.port";
+    protected List<ServiceRegistration<?>> registrations = new ArrayList<>();
 
     @Configuration
     public Option[] config() {
@@ -223,12 +228,13 @@ public abstract class AbstractJobHandlin
     }
 
     public void cleanup() {
+        // clean job area
         final ServiceReference<ResourceResolverFactory> ref = 
this.bc.getServiceReference(ResourceResolverFactory.class);
         final ResourceResolverFactory factory = this.bc.getService(ref);
         ResourceResolver resolver = null;
         try {
             resolver = factory.getAdministrativeResourceResolver(null);
-            final Resource rsrc = 
resolver.getResource(JobManagerConfiguration.DEFAULT_REPOSITORY_PATH);
+            final Resource rsrc = resolver.getResource("/var/eventing");
             if ( rsrc != null ) {
                 delete(rsrc);
                 resolver.commit();
@@ -242,7 +248,13 @@ public abstract class AbstractJobHandlin
                 resolver.close();
             }
         }
-        // remove all configurations and clean content
+        // unregister all services
+        for(final ServiceRegistration<?> reg : this.registrations) {
+            reg.unregister();
+        }
+        this.registrations.clear();
+
+        // remove all configurations
         try {
             final org.osgi.service.cm.Configuration[] cfgs = 
this.configAdmin.listConfigurations(null);
             if ( cfgs != null ) {
@@ -271,15 +283,16 @@ public abstract class AbstractJobHandlin
         props.put(EventConstants.EVENT_TOPIC, topic);
         final ServiceRegistration<EventHandler> reg = 
this.bc.registerService(EventHandler.class,
                 handler, props);
+        this.registrations.add(reg);
         return reg;
     }
 
     protected long getConsumerChangeCount() {
         long result = -1;
         try {
-            final ServiceReference[] refs = 
this.bc.getServiceReferences(PropertyProvider.class.getName(), 
"(changeCount=*)");
-            if ( refs != null && refs.length > 0 ) {
-                result = (Long)refs[0].getProperty("changeCount");
+            final Collection<ServiceReference<PropertyProvider>> refs = 
this.bc.getServiceReferences(PropertyProvider.class, "(changeCount=*)");
+            if ( !refs.isEmpty() ) {
+                result = 
(Long)refs.iterator().next().getProperty("changeCount");
             }
         } catch ( final InvalidSyntaxException ignore ) {
             // ignore
@@ -309,6 +322,7 @@ public abstract class AbstractJobHandlin
         props.put(JobConsumer.PROPERTY_TOPICS, topic);
         final ServiceRegistration<JobConsumer> reg = 
this.bc.registerService(JobConsumer.class,
                 handler, props);
+        this.registrations.add(reg);
         this.waitConsumerChangeCount(cc + 1);
         return reg;
     }
@@ -323,7 +337,15 @@ public abstract class AbstractJobHandlin
         props.put(JobConsumer.PROPERTY_TOPICS, topic);
         final ServiceRegistration<JobExecutor> reg = 
this.bc.registerService(JobExecutor.class,
                 handler, props);
+        this.registrations.add(reg);
         this.waitConsumerChangeCount(cc + 1);
         return reg;
     }
+
+    protected void unregister(final ServiceRegistration<?> reg) {
+        if ( reg != null ) {
+            this.registrations.remove(reg);
+            reg.unregister();
+        }
+    }
 }

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
 Fri Jan  8 16:51:13 2016
@@ -128,9 +128,9 @@ public class ChaosTest extends AbstractJ
     /**
      * Setup consumers
      */
-    private void setupJobConsumers(final List<ServiceRegistration<?>> 
registrations) {
+    private void setupJobConsumers() {
         for(int i=0; i<NUM_ORDERED_TOPICS; i++) {
-            registrations.add(this.registerJobConsumer(ORDERED_TOPICS[i],
+            this.registerJobConsumer(ORDERED_TOPICS[i],
 
                 new JobConsumer() {
 
@@ -138,10 +138,10 @@ public class ChaosTest extends AbstractJ
                     public JobResult process(final Job job) {
                         return JobResult.OK;
                     }
-                }));
+                });
         }
         for(int i=0; i<NUM_PARALLEL_TOPICS; i++) {
-            registrations.add(this.registerJobConsumer(PARALLEL_TOPICS[i],
+            this.registerJobConsumer(PARALLEL_TOPICS[i],
 
                 new JobConsumer() {
 
@@ -149,10 +149,10 @@ public class ChaosTest extends AbstractJ
                     public JobResult process(final Job job) {
                         return JobResult.OK;
                     }
-                }));
+                });
         }
         for(int i=0; i<NUM_ROUND_TOPICS; i++) {
-            registrations.add(this.registerJobConsumer(ROUND_TOPICS[i],
+            this.registerJobConsumer(ROUND_TOPICS[i],
 
                 new JobConsumer() {
 
@@ -160,7 +160,7 @@ public class ChaosTest extends AbstractJ
                     public JobResult process(final Job job) {
                         return JobResult.OK;
                     }
-                }));
+                });
         }
     }
 
@@ -335,11 +335,10 @@ public class ChaosTest extends AbstractJ
             topics.add(ROUND_TOPICS[i]);
         }
 
-        final List<ServiceRegistration<?>> registrations = new 
ArrayList<ServiceRegistration<?>>();
         final List<Thread> threads = new ArrayList<Thread>();
         final AtomicLong finishedThreads = new AtomicLong();
 
-        final ServiceRegistration<EventHandler> eventHandler = 
this.registerEventHandler("org/apache/sling/event/notification/job/*",
+        this.registerEventHandler("org/apache/sling/event/notification/job/*",
                 new EventHandler() {
 
                     @Override
@@ -352,44 +351,44 @@ public class ChaosTest extends AbstractJ
                         }
                     }
                 });
-        try {
-            // setup job consumers
-            this.setupJobConsumers(registrations);
-
-            // setup job creation tests
-            this.setupJobCreationThreads(threads, jobManager, created, 
finishedThreads);
 
-            this.setupChaosThreads(threads, finishedThreads);
+        // setup job consumers
+        this.setupJobConsumers();
 
-            System.out.println("Starting threads...");
-            // start threads
-            for(final Thread t : threads) {
-                t.setDaemon(true);
-                t.start();
-            }
+        // setup job creation tests
+        this.setupJobCreationThreads(threads, jobManager, created, 
finishedThreads);
 
-            System.out.println("Sleeping for " + DURATION + " seconds to wait 
for threads to finish...");
-            // for sure we can sleep for the duration
-            this.sleep(DURATION * 1000);
-
-            System.out.println("Polling for threads to finish...");
-            // wait until threads are finished
-            while ( finishedThreads.get() < threads.size() ) {
-                this.sleep(100);
-            }
+        this.setupChaosThreads(threads, finishedThreads);
 
-            System.out.println("Waiting for job handling to finish...");
-            final Set<String> allTopics = new HashSet<String>(topics);
-            while ( !allTopics.isEmpty() ) {
-                final Iterator<String> iter = allTopics.iterator();
-                while ( iter.hasNext() ) {
-                    final String topic = iter.next();
-                    if ( finished.get(topic).get() == created.get(topic).get() 
) {
-                        iter.remove();
-                    }
+        System.out.println("Starting threads...");
+        // start threads
+        for(final Thread t : threads) {
+            t.setDaemon(true);
+            t.start();
+        }
+
+        System.out.println("Sleeping for " + DURATION + " seconds to wait for 
threads to finish...");
+        // for sure we can sleep for the duration
+        this.sleep(DURATION * 1000);
+
+        System.out.println("Polling for threads to finish...");
+        // wait until threads are finished
+        while ( finishedThreads.get() < threads.size() ) {
+            this.sleep(100);
+        }
+
+        System.out.println("Waiting for job handling to finish...");
+        final Set<String> allTopics = new HashSet<String>(topics);
+        while ( !allTopics.isEmpty() ) {
+            final Iterator<String> iter = allTopics.iterator();
+            while ( iter.hasNext() ) {
+                final String topic = iter.next();
+                if ( finished.get(topic).get() == created.get(topic).get() ) {
+                    iter.remove();
                 }
-                this.sleep(100);
             }
+            this.sleep(100);
+        }
 /* We could try to enable this with Oak again - but right now JR observation 
handler is too
  * slow.
             System.out.println("Checking notifications...");
@@ -398,12 +397,5 @@ public class ChaosTest extends AbstractJ
             }
  */
 
-        } finally {
-            eventHandler.unregister();
-            for(final ServiceRegistration<?> reg : registrations) {
-                reg.unregister();
-            }
-        }
-
     }
 }

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
 Fri Jan  8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
 
@@ -83,7 +82,7 @@ public class ClassloadingTest extends Ab
     public void testSimpleClassloading() throws Exception {
         final AtomicInteger processedJobsCount = new AtomicInteger(0);
         final List<Event> finishedEvents = Collections.synchronizedList(new 
ArrayList<Event>());
-        final ServiceRegistration<JobConsumer> jcReg = 
this.registerJobConsumer(TOPIC,
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
                     @Override
                     public JobResult process(Job job) {
@@ -91,7 +90,7 @@ public class ClassloadingTest extends Ab
                         return JobResult.OK;
                     }
                 });
-        final ServiceRegistration<EventHandler> ehReg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -99,70 +98,65 @@ public class ClassloadingTest extends Ab
                         finishedEvents.add(event);
                     }
                 });
-        try {
-            final JobManager jobManager = this.getJobManager();
+        final JobManager jobManager = this.getJobManager();
 
-            final List<String> list = new ArrayList<String>();
-            list.add("1");
-            list.add("2");
-
-            final Map<String, String> map = new HashMap<String, String>();
-            map.put("a", "a1");
-            map.put("b", "b2");
-
-            // we start a single job
-            final Map<String, Object> props = new HashMap<String, Object>();
-            props.put("string", "Hello");
-            props.put("int", new Integer(5));
-            props.put("long", new Long(7));
-            props.put("list", list);
-            props.put("map", map);
-
-            final String jobId = jobManager.addJob(TOPIC, props).getId();
-
-            new 
RetryLoop(Conditions.collectionIsNotEmptyCondition(finishedEvents,
-                    "Waiting for finishedEvents to have at least one 
element"), 5, 50);
-
-            // no jobs queued, none processed and no available
-            new RetryLoop(new RetryLoop.Condition() {
-
-                @Override
-                public String getDescription() {
-                    return "Waiting for job to be processed. Conditions: 
queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs() +
-                            ", jobsCount=" + processedJobsCount + ", 
findJobs=" +
-                            jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC, -1, (Map<String, Object>[]) null)
-                            .size();
-                }
-
-                @Override
-                public boolean isTrue() throws Exception {
-                    return jobManager.getStatistics().getNumberOfQueuedJobs() 
== 0
-                            && processedJobsCount.get() == 1
-                            && jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC, -1, (Map<String, Object>[]) null)
-                                    .size() == 0;
-                }
-            }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
-
-            final String jobTopic = 
(String)finishedEvents.get(0).getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
-            assertNotNull(jobTopic);
-            assertEquals("Hello", finishedEvents.get(0).getProperty("string"));
-            assertEquals(new Integer(5), 
Integer.valueOf(finishedEvents.get(0).getProperty("int").toString()));
-            assertEquals(new Long(7), 
Long.valueOf(finishedEvents.get(0).getProperty("long").toString()));
-            assertEquals(list, finishedEvents.get(0).getProperty("list"));
-            assertEquals(map, finishedEvents.get(0).getProperty("map"));
-
-            jobManager.removeJobById(jobId);
-        } finally {
-            jcReg.unregister();
-            ehReg.unregister();
-        }
+        final List<String> list = new ArrayList<String>();
+        list.add("1");
+        list.add("2");
+
+        final Map<String, String> map = new HashMap<String, String>();
+        map.put("a", "a1");
+        map.put("b", "b2");
+
+        // we start a single job
+        final Map<String, Object> props = new HashMap<String, Object>();
+        props.put("string", "Hello");
+        props.put("int", new Integer(5));
+        props.put("long", new Long(7));
+        props.put("list", list);
+        props.put("map", map);
+
+        final String jobId = jobManager.addJob(TOPIC, props).getId();
+
+        new RetryLoop(Conditions.collectionIsNotEmptyCondition(finishedEvents,
+                "Waiting for finishedEvents to have at least one element"), 5, 
50);
+
+        // no jobs queued, none processed and no available
+        new RetryLoop(new RetryLoop.Condition() {
+
+            @Override
+            public String getDescription() {
+                return "Waiting for job to be processed. Conditions: 
queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs() +
+                        ", jobsCount=" + processedJobsCount + ", findJobs=" +
+                        jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, 
-1, (Map<String, Object>[]) null)
+                        .size();
+            }
+
+            @Override
+            public boolean isTrue() throws Exception {
+                return jobManager.getStatistics().getNumberOfQueuedJobs() == 0
+                        && processedJobsCount.get() == 1
+                        && jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC, -1, (Map<String, Object>[]) null)
+                                .size() == 0;
+            }
+        }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
+
+        final String jobTopic = 
(String)finishedEvents.get(0).getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+        assertNotNull(jobTopic);
+        assertEquals("Hello", finishedEvents.get(0).getProperty("string"));
+        assertEquals(new Integer(5), 
Integer.valueOf(finishedEvents.get(0).getProperty("int").toString()));
+        assertEquals(new Long(7), 
Long.valueOf(finishedEvents.get(0).getProperty("long").toString()));
+        assertEquals(list, finishedEvents.get(0).getProperty("list"));
+        assertEquals(map, finishedEvents.get(0).getProperty("map"));
+
+        jobManager.removeJobById(jobId);
     }
 
     @Test(timeout = DEFAULT_TEST_TIMEOUT)
     public void testFailedClassloading() throws Exception {
         final AtomicInteger failedJobsCount = new AtomicInteger(0);
         final List<Event> finishedEvents = Collections.synchronizedList(new 
ArrayList<Event>());
-        final ServiceRegistration<JobConsumer> jcReg = 
this.registerJobConsumer(TOPIC + "/failed",
+        this.registerJobConsumer(TOPIC + "/failed",
                 new JobConsumer() {
 
                     @Override
@@ -171,7 +165,7 @@ public class ClassloadingTest extends Ab
                         return JobResult.OK;
                     }
                 });
-        final ServiceRegistration<EventHandler> ehReg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -179,52 +173,47 @@ public class ClassloadingTest extends Ab
                         finishedEvents.add(event);
                     }
                 });
-        try {
-            final JobManager jobManager = this.getJobManager();
+        final JobManager jobManager = this.getJobManager();
+
+        // dao is an invisible class for the dynamic class loader as it is not 
public
+        // therefore scheduling this job should fail!
+        final DataObject dao = new DataObject();
+
+        // we start a single job
+        final Map<String, Object> props = new HashMap<String, Object>();
+        props.put("dao", dao);
+
+        final String id = jobManager.addJob(TOPIC + "/failed", props).getId();
+
+        // wait until the conditions are met
+        new RetryLoop(new RetryLoop.Condition() {
+
+            @Override
+            public boolean isTrue() throws Exception {
+                return failedJobsCount.get() == 0
+                        && finishedEvents.size() == 0
+                        && jobManager.findJobs(JobManager.QueryType.ALL, TOPIC 
+ "/failed", -1,
+                                (Map<String, Object>[]) null).size() == 1
+                        && jobManager.getStatistics().getNumberOfQueuedJobs() 
== 0
+                        && jobManager.getStatistics().getNumberOfActiveJobs() 
== 0;
+            }
+
+            @Override
+            public String getDescription() {
+                return "Waiting for job failure to be recorded. Conditions " +
+                       "faildJobsCount=" + failedJobsCount.get() +
+                       ", finishedEvents=" + finishedEvents.size() +
+                       ", findJobs= " + 
jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
+                               (Map<String, Object>[]) null).size()
+                       +", queuedJobs=" + 
jobManager.getStatistics().getNumberOfQueuedJobs()
+                       +", activeJobs=" + 
jobManager.getStatistics().getNumberOfActiveJobs();
+            }
+        }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
+
+        jobManager.removeJobById(id); // moves the job to the history section
+        assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + 
"/failed", -1, (Map<String, Object>[])null).size());
 
-            // dao is an invisible class for the dynamic class loader as it is 
not public
-            // therefore scheduling this job should fail!
-            final DataObject dao = new DataObject();
-
-            // we start a single job
-            final Map<String, Object> props = new HashMap<String, Object>();
-            props.put("dao", dao);
-
-            final String id = jobManager.addJob(TOPIC + "/failed", 
props).getId();
-
-            // wait until the conditions are met
-            new RetryLoop(new RetryLoop.Condition() {
-
-                @Override
-                public boolean isTrue() throws Exception {
-                    return failedJobsCount.get() == 0
-                            && finishedEvents.size() == 0
-                            && jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC + "/failed", -1,
-                                    (Map<String, Object>[]) null).size() == 1
-                            && 
jobManager.getStatistics().getNumberOfQueuedJobs() == 0
-                            && 
jobManager.getStatistics().getNumberOfActiveJobs() == 0;
-                }
-
-                @Override
-                public String getDescription() {
-                    return "Waiting for job failure to be recorded. Conditions 
" +
-                           "faildJobsCount=" + failedJobsCount.get() +
-                           ", finishedEvents=" + finishedEvents.size() +
-                           ", findJobs= " + 
jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
-                                   (Map<String, Object>[]) null).size()
-                           +", queuedJobs=" + 
jobManager.getStatistics().getNumberOfQueuedJobs()
-                           +", activeJobs=" + 
jobManager.getStatistics().getNumberOfActiveJobs();
-                }
-            }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
-
-            jobManager.removeJobById(id); // moves the job to the history 
section
-            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC + "/failed", -1, (Map<String, Object>[])null).size());
-
-            jobManager.removeJobById(id); // removes the job permanently
-        } finally {
-            jcReg.unregister();
-            ehReg.unregister();
-        }
+        jobManager.removeJobById(id); // removes the job permanently
     }
 
     private static final class DataObject implements Serializable {

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
 Fri Jan  8 16:51:13 2016
@@ -41,7 +41,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 
 @RunWith(PaxExam.class)
 public class HistoryTest extends AbstractJobHandlingTest {
@@ -88,7 +87,7 @@ public class HistoryTest extends Abstrac
      */
     @Test(timeout = DEFAULT_TEST_TIMEOUT)
     public void testHistory() throws Exception {
-        final ServiceRegistration reg = this.registerJobExecutor(TOPIC,
+        this.registerJobExecutor(TOPIC,
                 new JobExecutor() {
 
                     @Override
@@ -102,51 +101,41 @@ public class HistoryTest extends Abstrac
                     }
 
                 });
-        Collection<Job> col = null;
-        try {
-            for(int i = 0; i< 10; i++) {
-                this.addJob(i);
-            }
-            this.sleep(200L);
-            while ( 
this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1, 
(Map<String, Object>[])null).size() < 10 ) {
-                this.sleep(20L);
-            }
-            col = this.getJobManager().findJobs(JobManager.QueryType.HISTORY, 
TOPIC, -1, (Map<String, Object>[])null);
-            assertEquals(10, col.size());
-            assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.ACTIVE, TOPIC, -1, 
(Map<String, Object>[])null).size());
-            assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.QUEUED, TOPIC, -1, 
(Map<String, Object>[])null).size());
-            assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String, 
Object>[])null).size());
-            assertEquals(3, 
this.getJobManager().findJobs(JobManager.QueryType.CANCELLED, TOPIC, -1, 
(Map<String, Object>[])null).size());
-            assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.DROPPED, TOPIC, -1, 
(Map<String, Object>[])null).size());
-            assertEquals(3, 
this.getJobManager().findJobs(JobManager.QueryType.ERROR, TOPIC, -1, 
(Map<String, Object>[])null).size());
-            assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.GIVEN_UP, TOPIC, -1, 
(Map<String, Object>[])null).size());
-            assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.STOPPED, TOPIC, -1, 
(Map<String, Object>[])null).size());
-            assertEquals(7, 
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, TOPIC, -1, 
(Map<String, Object>[])null).size());
-
-            // find all topics
-            assertEquals(7, 
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, null, -1, 
(Map<String, Object>[])null).size());
-
-            // verify order, message and state
-            long last = 9;
-            for(final Job j : col) {
-                assertNotNull(j.getFinishedDate());
-                final long count = j.getProperty(PROP_COUNTER, Long.class);
-                assertEquals(last, count);
-                if ( count == 2 || count == 5 || count == 7 ) {
-                    assertEquals(Job.JobState.ERROR, j.getJobState());
-                } else {
-                    assertEquals(Job.JobState.SUCCEEDED, j.getJobState());
-                }
-                assertEquals(j.getJobState().name(), j.getResultMessage());
-                last--;
-            }
-        } finally {
-            if ( col != null ) {
-                for(final Job j : col) {
-                    this.getJobManager().removeJobById(j.getId());
-                }
+        for(int i = 0; i< 10; i++) {
+            this.addJob(i);
+        }
+        this.sleep(200L);
+        while ( this.getJobManager().findJobs(JobManager.QueryType.HISTORY, 
TOPIC, -1, (Map<String, Object>[])null).size() < 10 ) {
+            this.sleep(20L);
+        }
+        Collection<Job> col = 
this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1, 
(Map<String, Object>[])null);
+        assertEquals(10, col.size());
+        assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.ACTIVE, TOPIC, -1, 
(Map<String, Object>[])null).size());
+        assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.QUEUED, TOPIC, -1, 
(Map<String, Object>[])null).size());
+        assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String, 
Object>[])null).size());
+        assertEquals(3, 
this.getJobManager().findJobs(JobManager.QueryType.CANCELLED, TOPIC, -1, 
(Map<String, Object>[])null).size());
+        assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.DROPPED, TOPIC, -1, 
(Map<String, Object>[])null).size());
+        assertEquals(3, 
this.getJobManager().findJobs(JobManager.QueryType.ERROR, TOPIC, -1, 
(Map<String, Object>[])null).size());
+        assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.GIVEN_UP, TOPIC, -1, 
(Map<String, Object>[])null).size());
+        assertEquals(0, 
this.getJobManager().findJobs(JobManager.QueryType.STOPPED, TOPIC, -1, 
(Map<String, Object>[])null).size());
+        assertEquals(7, 
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, TOPIC, -1, 
(Map<String, Object>[])null).size());
+
+        // find all topics
+        assertEquals(7, 
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, null, -1, 
(Map<String, Object>[])null).size());
+
+        // verify order, message and state
+        long last = 9;
+        for(final Job j : col) {
+            assertNotNull(j.getFinishedDate());
+            final long count = j.getProperty(PROP_COUNTER, Long.class);
+            assertEquals(last, count);
+            if ( count == 2 || count == 5 || count == 7 ) {
+                assertEquals(Job.JobState.ERROR, j.getJobState());
+            } else {
+                assertEquals(Job.JobState.SUCCEEDED, j.getJobState());
             }
-            reg.unregister();
+            assertEquals(j.getJobState().name(), j.getResultMessage());
+            last--;
         }
     }
 }
\ No newline at end of file

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
 Fri Jan  8 16:51:13 2016
@@ -49,7 +49,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
 
@@ -90,7 +89,7 @@ public class JobHandlingTest extends Abs
     public void testSimpleJobExecutionUsingJobConsumer() throws Exception {
         final Barrier cb = new Barrier(2);
 
-        final ServiceRegistration reg = this.registerJobConsumer(TOPIC,
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
 
             @Override
@@ -100,14 +99,10 @@ public class JobHandlingTest extends Abs
                     }
                  });
 
-        try {
-            this.getJobManager().addJob(TOPIC, null);
-            assertTrue("No event received in the given time.", cb.block(5));
-            cb.reset();
-            assertFalse("Unexpected event received in the given time.", 
cb.block(5));
-        } finally {
-            reg.unregister();
-        }
+        this.getJobManager().addJob(TOPIC, null);
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", 
cb.block(5));
     }
 
     /**
@@ -118,7 +113,7 @@ public class JobHandlingTest extends Abs
     public void testSimpleJobExecutionUsingJobExecutor() throws Exception {
         final Barrier cb = new Barrier(2);
 
-        final ServiceRegistration reg = this.registerJobExecutor(TOPIC,
+        this.registerJobExecutor(TOPIC,
                 new JobExecutor() {
 
                     @Override
@@ -128,19 +123,15 @@ public class JobHandlingTest extends Abs
                     }
                 });
 
-        try {
-            this.getJobManager().addJob(TOPIC, null);
-            assertTrue("No event received in the given time.", cb.block(5));
-            cb.reset();
-            assertFalse("Unexpected event received in the given time.", 
cb.block(5));
-        } finally {
-            reg.unregister();
-        }
+        this.getJobManager().addJob(TOPIC, null);
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", 
cb.block(5));
     }
 
     @Test(timeout = DEFAULT_TEST_TIMEOUT)
     public void testManyJobs() throws Exception {
-        final ServiceRegistration reg1 = this.registerJobConsumer(TOPIC,
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
 
                     @Override
@@ -150,7 +141,7 @@ public class JobHandlingTest extends Abs
 
                  });
         final AtomicInteger count = new AtomicInteger(0);
-        final ServiceRegistration reg2 = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
                     @Override
                     public void handleEvent(final Event event) {
@@ -158,23 +149,16 @@ public class JobHandlingTest extends Abs
                     }
                  });
 
-        try {
-            // we start "some" jobs
-            final int COUNT = 300;
-            for(int i = 0; i < COUNT; i++ ) {
-                this.getJobManager().addJob(TOPIC, null);
-            }
-            while ( count.get() < COUNT ) {
-                this.sleep(50);
-            }
-            assertEquals("Finished count", COUNT, count.get());
-            assertEquals("Finished count", COUNT, 
this.getJobManager().getStatistics().getNumberOfFinishedJobs());
-        } finally {
-            reg1.unregister();
-            reg2.unregister();
+        // we start "some" jobs
+        final int COUNT = 300;
+        for(int i = 0; i < COUNT; i++ ) {
+            this.getJobManager().addJob(TOPIC, null);
         }
-        // we put an extra sleep to see whether this fixes the test problems 
on Java 8
-        this.sleep(5000);
+        while ( count.get() < COUNT ) {
+            this.sleep(50);
+        }
+        assertEquals("Finished count", COUNT, count.get());
+        assertEquals("Finished count", COUNT, 
this.getJobManager().getStatistics().getNumberOfFinishedJobs());
     }
 
     /**
@@ -185,7 +169,7 @@ public class JobHandlingTest extends Abs
     public void testCancelJob() throws Exception {
         final Barrier cb = new Barrier(2);
         final Barrier cb2 = new Barrier(2);
-        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
 
                     @Override
@@ -195,41 +179,38 @@ public class JobHandlingTest extends Abs
                         return JobResult.FAILED;
                     }
                 });
+
+        final Map<String, Object> jobProperties = 
Collections.singletonMap("id", (Object)"cancelJobId");
+        @SuppressWarnings("unchecked")
+        final Map<String, Object>[] jobPropertiesAsArray = new Map[1];
+        jobPropertiesAsArray[0] = jobProperties;
+
+        // create job
+        final JobManager jobManager = this.getJobManager();
+        jobManager.addJob(TOPIC, jobProperties);
+        cb.block();
+
+        assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, 
-1, jobPropertiesAsArray).size());
+        // job is currently waiting, therefore cancel fails
+        final Job e1 = jobManager.getJob(TOPIC, jobProperties);
+        assertNotNull(e1);
+        cb2.block(); // and continue job
+
+        sleep(200);
+
+        // the job is now in the queue again
+        final Job e2 = jobManager.getJob(TOPIC, jobProperties);
+        assertNotNull(e2);
+        assertTrue(jobManager.removeJobById(e2.getId()));
+        assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, 
-1, jobPropertiesAsArray).size());
+        final Collection<Job> col = 
jobManager.findJobs(JobManager.QueryType.HISTORY, TOPIC, -1,
+                jobPropertiesAsArray);
         try {
-            final Map<String, Object> jobProperties = 
Collections.singletonMap("id", (Object)"cancelJobId");
-            @SuppressWarnings("unchecked")
-            final Map<String, Object>[] jobPropertiesAsArray = new Map[1];
-            jobPropertiesAsArray[0] = jobProperties;
-
-            // create job
-            final JobManager jobManager = this.getJobManager();
-            jobManager.addJob(TOPIC, jobProperties);
-            cb.block();
-
-            assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC, -1, jobPropertiesAsArray).size());
-            // job is currently waiting, therefore cancel fails
-            final Job e1 = jobManager.getJob(TOPIC, jobProperties);
-            assertNotNull(e1);
-            cb2.block(); // and continue job
-
-            sleep(200);
-
-            // the job is now in the queue again
-            final Job e2 = jobManager.getJob(TOPIC, jobProperties);
-            assertNotNull(e2);
-            assertTrue(jobManager.removeJobById(e2.getId()));
-            assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC, -1, jobPropertiesAsArray).size());
-            final Collection<Job> col = 
jobManager.findJobs(JobManager.QueryType.HISTORY, TOPIC, -1,
-                    jobPropertiesAsArray);
-            try {
-                assertEquals(1, col.size());
-            } finally {
-                for(final Job j : col) {
-                    jobManager.removeJobById(j.getId());
-                }
-            }
+            assertEquals(1, col.size());
         } finally {
-            jcReg.unregister();
+            for(final Job j : col) {
+                jobManager.removeJobById(j.getId());
+            }
         }
    }
 
@@ -240,7 +221,7 @@ public class JobHandlingTest extends Abs
     public void testGetJob() throws Exception {
         final Barrier cb = new Barrier(2);
         final Barrier cb2 = new Barrier(2);
-        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
 
                     @Override
@@ -250,19 +231,15 @@ public class JobHandlingTest extends Abs
                         return JobResult.OK;
                     }
                 });
-        try {
-            final JobManager jobManager = this.getJobManager();
-            final Job j = jobManager.addJob(TOPIC, null);
-            cb.block();
+        final JobManager jobManager = this.getJobManager();
+        final Job j = jobManager.addJob(TOPIC, null);
+        cb.block();
 
-            assertNotNull(jobManager.getJob(TOPIC, null));
+        assertNotNull(jobManager.getJob(TOPIC, null));
 
-            cb2.block(); // and continue job
+        cb2.block(); // and continue job
 
-            jobManager.removeJobById(j.getId());
-        } finally {
-            jcReg.unregister();
-        }
+        jobManager.removeJobById(j.getId());
     }
 
     /**
@@ -273,7 +250,8 @@ public class JobHandlingTest extends Abs
     public void testStartJobAndReschedule() throws Exception {
         final List<Integer> retryCountList = new ArrayList<Integer>();
         final Barrier cb = new Barrier(2);
-        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
                     int retryCount;
 
@@ -291,26 +269,23 @@ public class JobHandlingTest extends Abs
                         return JobResult.FAILED;
                     }
                 });
-        try {
-            final JobManager jobManager = this.getJobManager();
-            final Job job = jobManager.addJob(TOPIC, null);
 
-            assertTrue("No event received in the given time.", cb.block(5));
-            cb.reset();
-            // the job is retried after two seconds, so we wait again
-            assertTrue("No event received in the given time.", cb.block(5));
-            cb.reset();
-            // the job is retried after two seconds, so we wait again
-            assertTrue("No event received in the given time.", cb.block(5));
-            // we have reached the retry so we expect to not get an event
-            cb.reset();
-            assertFalse("Unexpected event received in the given time.", 
cb.block(5));
-            assertEquals("Unexpected number of retries", 3, 
retryCountList.size());
+        final JobManager jobManager = this.getJobManager();
+        final Job job = jobManager.addJob(TOPIC, null);
 
-            jobManager.removeJobById(job.getId());
-        } finally {
-            jcReg.unregister();
-        }
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        // the job is retried after two seconds, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+        // the job is retried after two seconds, so we wait again
+        assertTrue("No event received in the given time.", cb.block(5));
+        // we have reached the retry so we expect to not get an event
+        cb.reset();
+        assertFalse("Unexpected event received in the given time.", 
cb.block(5));
+        assertEquals("Unexpected number of retries", 3, retryCountList.size());
+
+        jobManager.removeJobById(job.getId());
     }
 
     /**
@@ -324,7 +299,7 @@ public class JobHandlingTest extends Abs
         final List<String> failed = Collections.synchronizedList(new 
ArrayList<String>());
         final List<String> finished = Collections.synchronizedList(new 
ArrayList<String>());
         final List<String> started = Collections.synchronizedList(new 
ArrayList<String>());
-        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
 
                     @Override
@@ -362,7 +337,7 @@ public class JobHandlingTest extends Abs
                         return JobResult.FAILED;
                     }
                 });
-        final ServiceRegistration eh1Reg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_CANCELLED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_CANCELLED,
                 new EventHandler() {
 
                     @Override
@@ -371,7 +346,7 @@ public class JobHandlingTest extends Abs
                         cancelled.add(id);
                     }
                 });
-        final ServiceRegistration eh2Reg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FAILED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FAILED,
                 new EventHandler() {
 
                     @Override
@@ -380,7 +355,7 @@ public class JobHandlingTest extends Abs
                         failed.add(id);
                     }
                 });
-        final ServiceRegistration eh3Reg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -389,7 +364,7 @@ public class JobHandlingTest extends Abs
                         finished.add(id);
                     }
                 });
-        final ServiceRegistration eh4Reg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_STARTED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_STARTED,
                 new EventHandler() {
 
                     @Override
@@ -400,37 +375,26 @@ public class JobHandlingTest extends Abs
                 });
 
         final JobManager jobManager = this.getJobManager();
-        try {
-            jobManager.addJob(TOPIC, Collections.singletonMap("id", 
(Object)"1"));
-            jobManager.addJob(TOPIC, Collections.singletonMap("id", 
(Object)"2"));
-            jobManager.addJob(TOPIC, Collections.singletonMap("id", 
(Object)"3"));
-            jobManager.addJob(TOPIC, Collections.singletonMap("id", 
(Object)"4"));
-            jobManager.addJob(TOPIC, Collections.singletonMap("id", 
(Object)"5"));
-
-            int count = 0;
-            final long startTime = System.currentTimeMillis();
-            do {
-                count = finished.size() + cancelled.size();
-                // after 25 seconds we cancel the test
-                if ( System.currentTimeMillis() - startTime > 25000 ) {
-                    throw new Exception("Timeout during notification test.");
-                }
-            } while ( count < 5 || started.size() < 10 );
-            assertEquals("Finished count", 4, finished.size());
-            assertEquals("Cancelled count", 1, cancelled.size());
-            assertEquals("Started count", 10, started.size());
-            assertEquals("Failed count", 5, failed.size());
-        } finally {
-            final Collection<Job> col = 
jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1, 
(Map<String, Object>[])null);
-            for(final Job j : col) {
-                jobManager.removeJobById(j.getId());
+
+        jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"1"));
+        jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"2"));
+        jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"3"));
+        jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"4"));
+        jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"5"));
+
+        int count = 0;
+        final long startTime = System.currentTimeMillis();
+        do {
+            count = finished.size() + cancelled.size();
+            // after 25 seconds we cancel the test
+            if ( System.currentTimeMillis() - startTime > 25000 ) {
+                throw new Exception("Timeout during notification test.");
             }
-            jcReg.unregister();
-            eh1Reg.unregister();
-            eh2Reg.unregister();
-            eh3Reg.unregister();
-            eh4Reg.unregister();
-        }
+        } while ( count < 5 || started.size() < 10 );
+        assertEquals("Finished count", 4, finished.size());
+        assertEquals("Cancelled count", 1, cancelled.size());
+        assertEquals("Started count", 10, started.size());
+        assertEquals("Failed count", 5, failed.size());
     }
 
     /**
@@ -440,7 +404,7 @@ public class JobHandlingTest extends Abs
     public void testNoJobProcessor() throws Exception {
         final AtomicInteger count = new AtomicInteger(0);
 
-        final ServiceRegistration eh1 = this.registerJobConsumer(TOPIC,
+        this.registerJobConsumer(TOPIC,
                 new JobConsumer() {
 
             @Override
@@ -451,31 +415,22 @@ public class JobHandlingTest extends Abs
             }
          });
 
-        try {
-            final JobManager jobManager = this.getJobManager();
-
-            // we start 20 jobs, every second job has no processor
-            final int COUNT = 20;
-            for(int i = 0; i < COUNT; i++ ) {
-                final String jobTopic = (i % 2 == 0 ? TOPIC : TOPIC + "2");
-
-                jobManager.addJob(jobTopic, null);
-            }
-            while ( jobManager.getStatistics().getNumberOfFinishedJobs() < 
COUNT / 2) {
-                this.sleep(50);
-            }
+        final JobManager jobManager = this.getJobManager();
 
-            assertEquals("Finished count", COUNT / 2, count.get());
-            // unprocessed count should be 0 as there is no job consumer for 
this job
-            assertEquals("Unprocessed count", 0, 
jobManager.getStatistics().getNumberOfJobs());
-            assertEquals("Finished count", COUNT / 2, 
jobManager.getStatistics().getNumberOfFinishedJobs());
+        // we start 20 jobs, every second job has no processor
+        final int COUNT = 20;
+        for(int i = 0; i < COUNT; i++ ) {
+            final String jobTopic = (i % 2 == 0 ? TOPIC : TOPIC + "2");
 
-            // now remove jobs
-            for(final Job j : jobManager.findJobs(JobManager.QueryType.ALL, 
TOPIC + "2", -1, (Map<String, Object>[])null)) {
-                jobManager.removeJobById(j.getId());
-            }
-        } finally {
-            eh1.unregister();
+            jobManager.addJob(jobTopic, null);
+        }
+        while ( jobManager.getStatistics().getNumberOfFinishedJobs() < COUNT / 
2) {
+            this.sleep(50);
         }
+
+        assertEquals("Finished count", COUNT / 2, count.get());
+        // unprocessed count should be 0 as there is no job consumer for this 
job
+        assertEquals("Unprocessed count", 0, 
jobManager.getStatistics().getNumberOfJobs());
+        assertEquals("Finished count", COUNT / 2, 
jobManager.getStatistics().getNumberOfFinishedJobs());
     }
 }
\ No newline at end of file

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
 Fri Jan  8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
 
@@ -86,7 +85,7 @@ public class OrderedQueueTest extends Ab
         final Barrier cb = new Barrier(2);
         final AtomicInteger count = new AtomicInteger(0);
         final AtomicInteger parallelCount = new AtomicInteger(0);
-        final ServiceRegistration<JobConsumer> jcReg = 
this.registerJobConsumer("sling/orderedtest/*",
+        this.registerJobConsumer("sling/orderedtest/*",
                 new JobConsumer() {
 
                     private volatile int lastCounter = -1;
@@ -123,7 +122,7 @@ public class OrderedQueueTest extends Ab
                         return JobResult.OK;
                     }
                 });
-        final ServiceRegistration<EventHandler> ehReg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -132,48 +131,43 @@ public class OrderedQueueTest extends Ab
                     }
                 });
 
-        try {
-            // we first sent one event to get the queue started
-            final Map<String, Object> properties = new HashMap<String, 
Object>();
-            properties.put("counter", -1);
-            jobManager.addJob("sling/orderedtest/start", properties);
-            assertTrue("No event received in the given time.", cb.block(5));
-            cb.reset();
-
-            // get the queue
-            final Queue q = jobManager.getQueue("orderedtest");
-            assertNotNull("Queue 'orderedtest' should exist!", q);
-
-            // suspend it
-            q.suspend();
-
-            final int NUM_JOBS = 30;
-
-            // we start "some" jobs:
-            for(int i = 0; i < NUM_JOBS; i++ ) {
-                final String subTopic = "sling/orderedtest/sub" + (i % 10);
-                properties.clear();
-                properties.put("counter", i);
-                jobManager.addJob(subTopic, properties);
-            }
-            // start the queue
-            q.resume();
-            while ( count.get() < NUM_JOBS +1 ) {
-                try {
-                    Thread.sleep(500);
-                } catch (InterruptedException ie) {
-                    // ignore
-                }
+        // we first sent one event to get the queue started
+        final Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put("counter", -1);
+        jobManager.addJob("sling/orderedtest/start", properties);
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+
+        // get the queue
+        final Queue q = jobManager.getQueue("orderedtest");
+        assertNotNull("Queue 'orderedtest' should exist!", q);
+
+        // suspend it
+        q.suspend();
+
+        final int NUM_JOBS = 30;
+
+        // we start "some" jobs:
+        for(int i = 0; i < NUM_JOBS; i++ ) {
+            final String subTopic = "sling/orderedtest/sub" + (i % 10);
+            properties.clear();
+            properties.put("counter", i);
+            jobManager.addJob(subTopic, properties);
+        }
+        // start the queue
+        q.resume();
+        while ( count.get() < NUM_JOBS +1 ) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // ignore
             }
-            // we started one event before the test, so add one
-            assertEquals("Finished count", NUM_JOBS + 1, count.get());
-            assertEquals("Finished count", NUM_JOBS + 1, 
jobManager.getStatistics().getNumberOfFinishedJobs());
-            assertEquals("Finished count", NUM_JOBS + 1, 
q.getStatistics().getNumberOfFinishedJobs());
-            assertEquals("Failed count", NUM_JOBS / 10, 
q.getStatistics().getNumberOfFailedJobs());
-            assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
-        } finally {
-            jcReg.unregister();
-            ehReg.unregister();
         }
+        // we started one event before the test, so add one
+        assertEquals("Finished count", NUM_JOBS + 1, count.get());
+        assertEquals("Finished count", NUM_JOBS + 1, 
jobManager.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Finished count", NUM_JOBS + 1, 
q.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Failed count", NUM_JOBS / 10, 
q.getStatistics().getNumberOfFailedJobs());
+        assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
     }
 }
\ No newline at end of file

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
 Fri Jan  8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
 
@@ -87,7 +86,7 @@ public class RoundRobinQueueTest extends
 
         final Barrier cb = new Barrier(2);
 
-        final ServiceRegistration jc1Reg = this.registerJobConsumer(TOPIC + 
"/start",
+        this.registerJobConsumer(TOPIC + "/start",
                 new JobConsumer() {
 
                     @Override
@@ -102,7 +101,7 @@ public class RoundRobinQueueTest extends
         final AtomicInteger parallelCount = new AtomicInteger(0);
         final Set<Integer> maxParticipants = new HashSet<Integer>();
 
-        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC + 
"/*",
+        this.registerJobConsumer(TOPIC + "/*",
                 new JobConsumer() {
 
                     @Override
@@ -120,7 +119,7 @@ public class RoundRobinQueueTest extends
                         return JobResult.OK;
                     }
                 });
-        final ServiceRegistration ehReg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -129,51 +128,45 @@ public class RoundRobinQueueTest extends
                     }
                 });
 
-        try {
-            // we first sent one event to get the queue started
-            jobManager.addJob(TOPIC + "/start", null);
-            assertTrue("No event received in the given time.", cb.block(5));
-            cb.reset();
-
-            // get the queue
-            final Queue q = jobManager.getQueue(QUEUE_NAME);
-            assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
-
-            // suspend it
-            q.suspend();
-
-            // we start "some" jobs:
-            for(int i = 0; i < NUM_JOBS; i++ ) {
-                final String subTopic = TOPIC + "/sub" + (i % 10);
-                final Map<String, Object> props = new HashMap<String, 
Object>();
-                if ( i < 10 ) {
-                    props.put("sleep", 300);
-                } else {
-                    props.put("sleep", 30);
-                }
-                jobManager.addJob(subTopic, props);
+        // we first sent one event to get the queue started
+        jobManager.addJob(TOPIC + "/start", null);
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+
+        // get the queue
+        final Queue q = jobManager.getQueue(QUEUE_NAME);
+        assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
+
+        // suspend it
+        q.suspend();
+
+        // we start "some" jobs:
+        for(int i = 0; i < NUM_JOBS; i++ ) {
+            final String subTopic = TOPIC + "/sub" + (i % 10);
+            final Map<String, Object> props = new HashMap<String, Object>();
+            if ( i < 10 ) {
+                props.put("sleep", 300);
+            } else {
+                props.put("sleep", 30);
             }
-            // start the queue
-            q.resume();
-            while ( count.get() < NUM_JOBS  + 1 ) {
-                assertEquals("Failed count", 0, 
q.getStatistics().getNumberOfFailedJobs());
-                assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
-                sleep(300);
-            }
-            // we started one event before the test, so add one
-            assertEquals("Finished count", NUM_JOBS + 1, count.get());
-            assertEquals("Finished count", NUM_JOBS + 1, 
jobManager.getStatistics().getNumberOfFinishedJobs());
-            assertEquals("Finished count", NUM_JOBS + 1, 
q.getStatistics().getNumberOfFinishedJobs());
+            jobManager.addJob(subTopic, props);
+        }
+        // start the queue
+        q.resume();
+        while ( count.get() < NUM_JOBS  + 1 ) {
             assertEquals("Failed count", 0, 
q.getStatistics().getNumberOfFailedJobs());
             assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
-            for(int i=1; i <= MAX_PAR; i++) {
-                assertTrue("# Participants " + String.valueOf(i) + " not in " 
+ maxParticipants,
-                        maxParticipants.contains(i));
-            }
-        } finally {
-            jc1Reg.unregister();
-            jcReg.unregister();
-            ehReg.unregister();
+            sleep(300);
+        }
+        // we started one event before the test, so add one
+        assertEquals("Finished count", NUM_JOBS + 1, count.get());
+        assertEquals("Finished count", NUM_JOBS + 1, 
jobManager.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Finished count", NUM_JOBS + 1, 
q.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Failed count", 0, 
q.getStatistics().getNumberOfFailedJobs());
+        assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
+        for(int i=1; i <= MAX_PAR; i++) {
+            assertTrue("# Participants " + String.valueOf(i) + " not in " + 
maxParticipants,
+                    maxParticipants.contains(i));
         }
     }
 }

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
 Fri Jan  8 16:51:13 2016
@@ -32,7 +32,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 
 @RunWith(PaxExam.class)
 public class SchedulingTest extends AbstractJobHandlingTest {
@@ -57,7 +56,7 @@ public class SchedulingTest extends Abst
     public void testScheduling() throws Exception {
         final AtomicInteger counter = new AtomicInteger();
 
-        final ServiceRegistration ehReg = this.registerJobConsumer(TOPIC, new 
JobConsumer() {
+        this.registerJobConsumer(TOPIC, new JobConsumer() {
 
             @Override
             public JobResult process(final Job job) {
@@ -68,26 +67,21 @@ public class SchedulingTest extends Abst
             }
 
         });
-        try {
 
-            // we schedule three jobs
-            final ScheduledJobInfo info1 = 
this.getJobManager().createJob(TOPIC).schedule().hourly(5).add();
-            assertNotNull(info1);
-            final ScheduledJobInfo info2 = 
this.getJobManager().createJob(TOPIC).schedule().daily(10, 5).add();
-            assertNotNull(info2);
-            final ScheduledJobInfo info3 = 
this.getJobManager().createJob(TOPIC).schedule().weekly(3, 19, 12).add();
-            assertNotNull(info3);
-
-            assertEquals(3, this.getJobManager().getScheduledJobs().size()); 
// scheduled jobs
-            info3.unschedule();
-            assertEquals(2, this.getJobManager().getScheduledJobs().size()); 
// scheduled jobs
-            info1.unschedule();
-            assertEquals(1, this.getJobManager().getScheduledJobs().size()); 
// scheduled jobs
-            info2.unschedule();
-            assertEquals(0, this.getJobManager().getScheduledJobs().size()); 
// scheduled jobs
-        } finally {
-            ehReg.unregister();
-        }
+        // we schedule three jobs
+        final ScheduledJobInfo info1 = 
this.getJobManager().createJob(TOPIC).schedule().hourly(5).add();
+        assertNotNull(info1);
+        final ScheduledJobInfo info2 = 
this.getJobManager().createJob(TOPIC).schedule().daily(10, 5).add();
+        assertNotNull(info2);
+        final ScheduledJobInfo info3 = 
this.getJobManager().createJob(TOPIC).schedule().weekly(3, 19, 12).add();
+        assertNotNull(info3);
+
+        assertEquals(3, this.getJobManager().getScheduledJobs().size()); // 
scheduled jobs
+        info3.unschedule();
+        assertEquals(2, this.getJobManager().getScheduledJobs().size()); // 
scheduled jobs
+        info1.unschedule();
+        assertEquals(1, this.getJobManager().getScheduledJobs().size()); // 
scheduled jobs
+        info2.unschedule();
+        assertEquals(0, this.getJobManager().getScheduledJobs().size()); // 
scheduled jobs
     }
-
 }

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
 Fri Jan  8 16:51:13 2016
@@ -33,7 +33,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 
 @RunWith(PaxExam.class)
 public class TimedJobsTest extends AbstractJobHandlingTest {
@@ -58,7 +57,7 @@ public class TimedJobsTest extends Abstr
     public void testTimedJob() throws Exception {
         final AtomicInteger counter = new AtomicInteger();
 
-        final ServiceRegistration ehReg = this.registerJobConsumer(TOPIC, new 
JobConsumer() {
+        this.registerJobConsumer(TOPIC, new JobConsumer() {
 
             @Override
             public JobResult process(final Job job) {
@@ -69,22 +68,19 @@ public class TimedJobsTest extends Abstr
             }
 
         });
-        try {
-            final Date d = new Date();
-            d.setTime(System.currentTimeMillis() + 3000); // run in 3 seconds
-
-            // create scheduled job
-            final ScheduledJobInfo info = 
this.getJobManager().createJob(TOPIC).schedule().at(d).add();
-            assertNotNull(info);
 
-            while ( counter.get() == 0 ) {
-                this.sleep(1000);
-            }
-            assertEquals(0, this.getJobManager().getScheduledJobs().size()); 
// job is not scheduled anymore
-            info.unschedule();
-        } finally {
-            ehReg.unregister();
+        final Date d = new Date();
+        d.setTime(System.currentTimeMillis() + 3000); // run in 3 seconds
+
+        // create scheduled job
+        final ScheduledJobInfo info = 
this.getJobManager().createJob(TOPIC).schedule().at(d).add();
+        assertNotNull(info);
+
+        while ( counter.get() == 0 ) {
+            this.sleep(1000);
         }
+        assertEquals(0, this.getJobManager().getScheduledJobs().size()); // 
job is not scheduled anymore
+        info.unschedule();
     }
 
 }

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
 Fri Jan  8 16:51:13 2016
@@ -61,7 +61,7 @@ public class TopicMatchingTest extends A
     public void testSimpleMatching() throws Exception {
         final Barrier barrier = new Barrier(2);
 
-        final ServiceRegistration<JobExecutor> reg = 
this.registerJobExecutor("sling/test/*",
+        this.registerJobExecutor("sling/test/*",
                 new JobExecutor() {
 
                     @Override
@@ -69,7 +69,7 @@ public class TopicMatchingTest extends A
                         return context.result().succeeded();
                     }
                 });
-        final ServiceRegistration<EventHandler> eventHandler = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -78,13 +78,8 @@ public class TopicMatchingTest extends A
                     }
                 });
 
-        try {
-            this.getJobManager().addJob(TOPIC, null);
-            barrier.block();
-        } finally {
-            reg.unregister();
-            eventHandler.unregister();
-        }
+        this.getJobManager().addJob(TOPIC, null);
+        barrier.block();
     }
 
     /**
@@ -94,7 +89,7 @@ public class TopicMatchingTest extends A
     public void testDeepMatching() throws Exception {
         final Barrier barrier = new Barrier(2);
 
-        final ServiceRegistration<JobExecutor> reg = 
this.registerJobExecutor("sling/**",
+        this.registerJobExecutor("sling/**",
                 new JobExecutor() {
 
                     @Override
@@ -102,7 +97,7 @@ public class TopicMatchingTest extends A
                         return context.result().succeeded();
                     }
                 });
-        final ServiceRegistration<EventHandler> eventHandler = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -111,13 +106,8 @@ public class TopicMatchingTest extends A
                     }
                 });
 
-        try {
-            this.getJobManager().addJob(TOPIC, null);
-            barrier.block();
-        } finally {
-            reg.unregister();
-            eventHandler.unregister();
-        }
+        this.getJobManager().addJob(TOPIC, null);
+        barrier.block();
     }
 
     /**
@@ -129,7 +119,7 @@ public class TopicMatchingTest extends A
         final Barrier barrier2 = new Barrier(2);
         final Barrier barrier3 = new Barrier(2);
 
-        final ServiceRegistration<JobExecutor> reg1 = 
this.registerJobExecutor("sling/**",
+        this.registerJobExecutor("sling/**",
                 new JobExecutor() {
 
                     @Override
@@ -163,17 +153,16 @@ public class TopicMatchingTest extends A
 
         // second test, unregister reg3, now it should be reg2
         long cc = this.getConsumerChangeCount();
-        reg3.unregister();
+        this.unregister(reg3);
         this.waitConsumerChangeCount(cc + 1);
         this.getJobManager().addJob(TOPIC, null);
         barrier2.block();
 
         // third test, unregister reg2, reg1 is now the only one
         cc = this.getConsumerChangeCount();
-        reg2.unregister();
+        this.unregister(reg2);
         this.waitConsumerChangeCount(cc + 1);
         this.getJobManager().addJob(TOPIC, null);
         barrier1.block();
-        reg1.unregister();
     }
 }

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java
 Fri Jan  8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
 
@@ -87,7 +86,7 @@ public class UnorderedQueueTest extends
 
         final Barrier cb = new Barrier(2);
 
-        final ServiceRegistration jc1Reg = this.registerJobConsumer(TOPIC + 
"/start",
+        this.registerJobConsumer(TOPIC + "/start",
                 new JobConsumer() {
 
                     @Override
@@ -102,7 +101,7 @@ public class UnorderedQueueTest extends
         final AtomicInteger parallelCount = new AtomicInteger(0);
         final Set<Integer> maxParticipants = new HashSet<Integer>();
 
-        final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC + 
"/*",
+        this.registerJobConsumer(TOPIC + "/*",
                 new JobConsumer() {
 
                     @Override
@@ -120,7 +119,7 @@ public class UnorderedQueueTest extends
                         return JobResult.OK;
                     }
                 });
-        final ServiceRegistration ehReg = 
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+        this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
                 new EventHandler() {
 
                     @Override
@@ -129,51 +128,45 @@ public class UnorderedQueueTest extends
                     }
                 });
 
-        try {
-            // we first sent one event to get the queue started
-            jobManager.addJob(TOPIC + "/start", null);
-            assertTrue("No event received in the given time.", cb.block(5));
-            cb.reset();
-
-            // get the queue
-            final Queue q = jobManager.getQueue(QUEUE_NAME);
-            assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
-
-            // suspend it
-            q.suspend();
-
-            // we start "some" jobs:
-            for(int i = 0; i < NUM_JOBS; i++ ) {
-                final String subTopic = TOPIC + "/sub" + (i % 10);
-                final Map<String, Object> props = new HashMap<String, 
Object>();
-                if ( i < 10 ) {
-                    props.put("sleep", 300);
-                } else {
-                    props.put("sleep", 30);
-                }
-                jobManager.addJob(subTopic, props);
+        // we first sent one event to get the queue started
+        jobManager.addJob(TOPIC + "/start", null);
+        assertTrue("No event received in the given time.", cb.block(5));
+        cb.reset();
+
+        // get the queue
+        final Queue q = jobManager.getQueue(QUEUE_NAME);
+        assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
+
+        // suspend it
+        q.suspend();
+
+        // we start "some" jobs:
+        for(int i = 0; i < NUM_JOBS; i++ ) {
+            final String subTopic = TOPIC + "/sub" + (i % 10);
+            final Map<String, Object> props = new HashMap<String, Object>();
+            if ( i < 10 ) {
+                props.put("sleep", 300);
+            } else {
+                props.put("sleep", 30);
             }
-            // start the queue
-            q.resume();
-            while ( count.get() < NUM_JOBS  + 1 ) {
-                assertEquals("Failed count", 0, 
q.getStatistics().getNumberOfFailedJobs());
-                assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
-                sleep(300);
-            }
-            // we started one event before the test, so add one
-            assertEquals("Finished count", NUM_JOBS + 1, count.get());
-            assertEquals("Finished count", NUM_JOBS + 1, 
jobManager.getStatistics().getNumberOfFinishedJobs());
-            assertEquals("Finished count", NUM_JOBS + 1, 
q.getStatistics().getNumberOfFinishedJobs());
+            jobManager.addJob(subTopic, props);
+        }
+        // start the queue
+        q.resume();
+        while ( count.get() < NUM_JOBS  + 1 ) {
             assertEquals("Failed count", 0, 
q.getStatistics().getNumberOfFailedJobs());
             assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
-            for(int i=1; i <= MAX_PAR; i++) {
-                assertTrue("# Participants " + String.valueOf(i) + " not in " 
+ maxParticipants,
-                        maxParticipants.contains(i));
-            }
-        } finally {
-            jc1Reg.unregister();
-            jcReg.unregister();
-            ehReg.unregister();
+            sleep(300);
+        }
+        // we started one event before the test, so add one
+        assertEquals("Finished count", NUM_JOBS + 1, count.get());
+        assertEquals("Finished count", NUM_JOBS + 1, 
jobManager.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Finished count", NUM_JOBS + 1, 
q.getStatistics().getNumberOfFinishedJobs());
+        assertEquals("Failed count", 0, 
q.getStatistics().getNumberOfFailedJobs());
+        assertEquals("Cancelled count", 0, 
q.getStatistics().getNumberOfCancelledJobs());
+        for(int i=1; i <= MAX_PAR; i++) {
+            assertTrue("# Participants " + String.valueOf(i) + " not in " + 
maxParticipants,
+                    maxParticipants.contains(i));
         }
     }
 }


Reply via email to