Author: cziegeler
Date: Fri Jan 8 16:51:13 2016
New Revision: 1723760
URL: http://svn.apache.org/viewvc?rev=1723760&view=rev
Log:
SLING-5417 : Jobs are only partially persisted
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
Fri Jan 8 16:51:13 2016
@@ -28,8 +28,11 @@ import static org.ops4j.pax.exam.CoreOpt
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.List;
import javax.inject.Inject;
@@ -64,6 +67,8 @@ public abstract class AbstractJobHandlin
private static final String DEFAULT_BUILD_DIR = "target";
+ private static final String PORT_CONFIG = "org.osgi.service.http.port";
+
protected static final int DEFAULT_TEST_TIMEOUT = 1000*60*5;
@Inject
@@ -75,7 +80,7 @@ public abstract class AbstractJobHandlin
@Inject
protected BundleContext bc;
- private static final String PORT_CONFIG = "org.osgi.service.http.port";
+ protected List<ServiceRegistration<?>> registrations = new ArrayList<>();
@Configuration
public Option[] config() {
@@ -223,12 +228,13 @@ public abstract class AbstractJobHandlin
}
public void cleanup() {
+ // clean job area
final ServiceReference<ResourceResolverFactory> ref =
this.bc.getServiceReference(ResourceResolverFactory.class);
final ResourceResolverFactory factory = this.bc.getService(ref);
ResourceResolver resolver = null;
try {
resolver = factory.getAdministrativeResourceResolver(null);
- final Resource rsrc =
resolver.getResource(JobManagerConfiguration.DEFAULT_REPOSITORY_PATH);
+ final Resource rsrc = resolver.getResource("/var/eventing");
if ( rsrc != null ) {
delete(rsrc);
resolver.commit();
@@ -242,7 +248,13 @@ public abstract class AbstractJobHandlin
resolver.close();
}
}
- // remove all configurations and clean content
+ // unregister all services
+ for(final ServiceRegistration<?> reg : this.registrations) {
+ reg.unregister();
+ }
+ this.registrations.clear();
+
+ // remove all configurations
try {
final org.osgi.service.cm.Configuration[] cfgs =
this.configAdmin.listConfigurations(null);
if ( cfgs != null ) {
@@ -271,15 +283,16 @@ public abstract class AbstractJobHandlin
props.put(EventConstants.EVENT_TOPIC, topic);
final ServiceRegistration<EventHandler> reg =
this.bc.registerService(EventHandler.class,
handler, props);
+ this.registrations.add(reg);
return reg;
}
protected long getConsumerChangeCount() {
long result = -1;
try {
- final ServiceReference[] refs =
this.bc.getServiceReferences(PropertyProvider.class.getName(),
"(changeCount=*)");
- if ( refs != null && refs.length > 0 ) {
- result = (Long)refs[0].getProperty("changeCount");
+ final Collection<ServiceReference<PropertyProvider>> refs =
this.bc.getServiceReferences(PropertyProvider.class, "(changeCount=*)");
+ if ( !refs.isEmpty() ) {
+ result =
(Long)refs.iterator().next().getProperty("changeCount");
}
} catch ( final InvalidSyntaxException ignore ) {
// ignore
@@ -309,6 +322,7 @@ public abstract class AbstractJobHandlin
props.put(JobConsumer.PROPERTY_TOPICS, topic);
final ServiceRegistration<JobConsumer> reg =
this.bc.registerService(JobConsumer.class,
handler, props);
+ this.registrations.add(reg);
this.waitConsumerChangeCount(cc + 1);
return reg;
}
@@ -323,7 +337,15 @@ public abstract class AbstractJobHandlin
props.put(JobConsumer.PROPERTY_TOPICS, topic);
final ServiceRegistration<JobExecutor> reg =
this.bc.registerService(JobExecutor.class,
handler, props);
+ this.registrations.add(reg);
this.waitConsumerChangeCount(cc + 1);
return reg;
}
+
+ protected void unregister(final ServiceRegistration<?> reg) {
+ if ( reg != null ) {
+ this.registrations.remove(reg);
+ reg.unregister();
+ }
+ }
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ChaosTest.java
Fri Jan 8 16:51:13 2016
@@ -128,9 +128,9 @@ public class ChaosTest extends AbstractJ
/**
* Setup consumers
*/
- private void setupJobConsumers(final List<ServiceRegistration<?>>
registrations) {
+ private void setupJobConsumers() {
for(int i=0; i<NUM_ORDERED_TOPICS; i++) {
- registrations.add(this.registerJobConsumer(ORDERED_TOPICS[i],
+ this.registerJobConsumer(ORDERED_TOPICS[i],
new JobConsumer() {
@@ -138,10 +138,10 @@ public class ChaosTest extends AbstractJ
public JobResult process(final Job job) {
return JobResult.OK;
}
- }));
+ });
}
for(int i=0; i<NUM_PARALLEL_TOPICS; i++) {
- registrations.add(this.registerJobConsumer(PARALLEL_TOPICS[i],
+ this.registerJobConsumer(PARALLEL_TOPICS[i],
new JobConsumer() {
@@ -149,10 +149,10 @@ public class ChaosTest extends AbstractJ
public JobResult process(final Job job) {
return JobResult.OK;
}
- }));
+ });
}
for(int i=0; i<NUM_ROUND_TOPICS; i++) {
- registrations.add(this.registerJobConsumer(ROUND_TOPICS[i],
+ this.registerJobConsumer(ROUND_TOPICS[i],
new JobConsumer() {
@@ -160,7 +160,7 @@ public class ChaosTest extends AbstractJ
public JobResult process(final Job job) {
return JobResult.OK;
}
- }));
+ });
}
}
@@ -335,11 +335,10 @@ public class ChaosTest extends AbstractJ
topics.add(ROUND_TOPICS[i]);
}
- final List<ServiceRegistration<?>> registrations = new
ArrayList<ServiceRegistration<?>>();
final List<Thread> threads = new ArrayList<Thread>();
final AtomicLong finishedThreads = new AtomicLong();
- final ServiceRegistration<EventHandler> eventHandler =
this.registerEventHandler("org/apache/sling/event/notification/job/*",
+ this.registerEventHandler("org/apache/sling/event/notification/job/*",
new EventHandler() {
@Override
@@ -352,44 +351,44 @@ public class ChaosTest extends AbstractJ
}
}
});
- try {
- // setup job consumers
- this.setupJobConsumers(registrations);
-
- // setup job creation tests
- this.setupJobCreationThreads(threads, jobManager, created,
finishedThreads);
- this.setupChaosThreads(threads, finishedThreads);
+ // setup job consumers
+ this.setupJobConsumers();
- System.out.println("Starting threads...");
- // start threads
- for(final Thread t : threads) {
- t.setDaemon(true);
- t.start();
- }
+ // setup job creation tests
+ this.setupJobCreationThreads(threads, jobManager, created,
finishedThreads);
- System.out.println("Sleeping for " + DURATION + " seconds to wait
for threads to finish...");
- // for sure we can sleep for the duration
- this.sleep(DURATION * 1000);
-
- System.out.println("Polling for threads to finish...");
- // wait until threads are finished
- while ( finishedThreads.get() < threads.size() ) {
- this.sleep(100);
- }
+ this.setupChaosThreads(threads, finishedThreads);
- System.out.println("Waiting for job handling to finish...");
- final Set<String> allTopics = new HashSet<String>(topics);
- while ( !allTopics.isEmpty() ) {
- final Iterator<String> iter = allTopics.iterator();
- while ( iter.hasNext() ) {
- final String topic = iter.next();
- if ( finished.get(topic).get() == created.get(topic).get()
) {
- iter.remove();
- }
+ System.out.println("Starting threads...");
+ // start threads
+ for(final Thread t : threads) {
+ t.setDaemon(true);
+ t.start();
+ }
+
+ System.out.println("Sleeping for " + DURATION + " seconds to wait for
threads to finish...");
+ // for sure we can sleep for the duration
+ this.sleep(DURATION * 1000);
+
+ System.out.println("Polling for threads to finish...");
+ // wait until threads are finished
+ while ( finishedThreads.get() < threads.size() ) {
+ this.sleep(100);
+ }
+
+ System.out.println("Waiting for job handling to finish...");
+ final Set<String> allTopics = new HashSet<String>(topics);
+ while ( !allTopics.isEmpty() ) {
+ final Iterator<String> iter = allTopics.iterator();
+ while ( iter.hasNext() ) {
+ final String topic = iter.next();
+ if ( finished.get(topic).get() == created.get(topic).get() ) {
+ iter.remove();
}
- this.sleep(100);
}
+ this.sleep(100);
+ }
/* We could try to enable this with Oak again - but right now JR observation
handler is too
* slow.
System.out.println("Checking notifications...");
@@ -398,12 +397,5 @@ public class ChaosTest extends AbstractJ
}
*/
- } finally {
- eventHandler.unregister();
- for(final ServiceRegistration<?> reg : registrations) {
- reg.unregister();
- }
- }
-
}
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java
Fri Jan 8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
@@ -83,7 +82,7 @@ public class ClassloadingTest extends Ab
public void testSimpleClassloading() throws Exception {
final AtomicInteger processedJobsCount = new AtomicInteger(0);
final List<Event> finishedEvents = Collections.synchronizedList(new
ArrayList<Event>());
- final ServiceRegistration<JobConsumer> jcReg =
this.registerJobConsumer(TOPIC,
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
public JobResult process(Job job) {
@@ -91,7 +90,7 @@ public class ClassloadingTest extends Ab
return JobResult.OK;
}
});
- final ServiceRegistration<EventHandler> ehReg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -99,70 +98,65 @@ public class ClassloadingTest extends Ab
finishedEvents.add(event);
}
});
- try {
- final JobManager jobManager = this.getJobManager();
+ final JobManager jobManager = this.getJobManager();
- final List<String> list = new ArrayList<String>();
- list.add("1");
- list.add("2");
-
- final Map<String, String> map = new HashMap<String, String>();
- map.put("a", "a1");
- map.put("b", "b2");
-
- // we start a single job
- final Map<String, Object> props = new HashMap<String, Object>();
- props.put("string", "Hello");
- props.put("int", new Integer(5));
- props.put("long", new Long(7));
- props.put("list", list);
- props.put("map", map);
-
- final String jobId = jobManager.addJob(TOPIC, props).getId();
-
- new
RetryLoop(Conditions.collectionIsNotEmptyCondition(finishedEvents,
- "Waiting for finishedEvents to have at least one
element"), 5, 50);
-
- // no jobs queued, none processed and no available
- new RetryLoop(new RetryLoop.Condition() {
-
- @Override
- public String getDescription() {
- return "Waiting for job to be processed. Conditions:
queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs() +
- ", jobsCount=" + processedJobsCount + ",
findJobs=" +
- jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC, -1, (Map<String, Object>[]) null)
- .size();
- }
-
- @Override
- public boolean isTrue() throws Exception {
- return jobManager.getStatistics().getNumberOfQueuedJobs()
== 0
- && processedJobsCount.get() == 1
- && jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC, -1, (Map<String, Object>[]) null)
- .size() == 0;
- }
- }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
-
- final String jobTopic =
(String)finishedEvents.get(0).getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
- assertNotNull(jobTopic);
- assertEquals("Hello", finishedEvents.get(0).getProperty("string"));
- assertEquals(new Integer(5),
Integer.valueOf(finishedEvents.get(0).getProperty("int").toString()));
- assertEquals(new Long(7),
Long.valueOf(finishedEvents.get(0).getProperty("long").toString()));
- assertEquals(list, finishedEvents.get(0).getProperty("list"));
- assertEquals(map, finishedEvents.get(0).getProperty("map"));
-
- jobManager.removeJobById(jobId);
- } finally {
- jcReg.unregister();
- ehReg.unregister();
- }
+ final List<String> list = new ArrayList<String>();
+ list.add("1");
+ list.add("2");
+
+ final Map<String, String> map = new HashMap<String, String>();
+ map.put("a", "a1");
+ map.put("b", "b2");
+
+ // we start a single job
+ final Map<String, Object> props = new HashMap<String, Object>();
+ props.put("string", "Hello");
+ props.put("int", new Integer(5));
+ props.put("long", new Long(7));
+ props.put("list", list);
+ props.put("map", map);
+
+ final String jobId = jobManager.addJob(TOPIC, props).getId();
+
+ new RetryLoop(Conditions.collectionIsNotEmptyCondition(finishedEvents,
+ "Waiting for finishedEvents to have at least one element"), 5,
50);
+
+ // no jobs queued, none processed and no available
+ new RetryLoop(new RetryLoop.Condition() {
+
+ @Override
+ public String getDescription() {
+ return "Waiting for job to be processed. Conditions:
queuedJobs=" + jobManager.getStatistics().getNumberOfQueuedJobs() +
+ ", jobsCount=" + processedJobsCount + ", findJobs=" +
+ jobManager.findJobs(JobManager.QueryType.ALL, TOPIC,
-1, (Map<String, Object>[]) null)
+ .size();
+ }
+
+ @Override
+ public boolean isTrue() throws Exception {
+ return jobManager.getStatistics().getNumberOfQueuedJobs() == 0
+ && processedJobsCount.get() == 1
+ && jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC, -1, (Map<String, Object>[]) null)
+ .size() == 0;
+ }
+ }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
+
+ final String jobTopic =
(String)finishedEvents.get(0).getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+ assertNotNull(jobTopic);
+ assertEquals("Hello", finishedEvents.get(0).getProperty("string"));
+ assertEquals(new Integer(5),
Integer.valueOf(finishedEvents.get(0).getProperty("int").toString()));
+ assertEquals(new Long(7),
Long.valueOf(finishedEvents.get(0).getProperty("long").toString()));
+ assertEquals(list, finishedEvents.get(0).getProperty("list"));
+ assertEquals(map, finishedEvents.get(0).getProperty("map"));
+
+ jobManager.removeJobById(jobId);
}
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testFailedClassloading() throws Exception {
final AtomicInteger failedJobsCount = new AtomicInteger(0);
final List<Event> finishedEvents = Collections.synchronizedList(new
ArrayList<Event>());
- final ServiceRegistration<JobConsumer> jcReg =
this.registerJobConsumer(TOPIC + "/failed",
+ this.registerJobConsumer(TOPIC + "/failed",
new JobConsumer() {
@Override
@@ -171,7 +165,7 @@ public class ClassloadingTest extends Ab
return JobResult.OK;
}
});
- final ServiceRegistration<EventHandler> ehReg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -179,52 +173,47 @@ public class ClassloadingTest extends Ab
finishedEvents.add(event);
}
});
- try {
- final JobManager jobManager = this.getJobManager();
+ final JobManager jobManager = this.getJobManager();
+
+ // dao is an invisible class for the dynamic class loader as it is not
public
+ // therefore scheduling this job should fail!
+ final DataObject dao = new DataObject();
+
+ // we start a single job
+ final Map<String, Object> props = new HashMap<String, Object>();
+ props.put("dao", dao);
+
+ final String id = jobManager.addJob(TOPIC + "/failed", props).getId();
+
+ // wait until the conditions are met
+ new RetryLoop(new RetryLoop.Condition() {
+
+ @Override
+ public boolean isTrue() throws Exception {
+ return failedJobsCount.get() == 0
+ && finishedEvents.size() == 0
+ && jobManager.findJobs(JobManager.QueryType.ALL, TOPIC
+ "/failed", -1,
+ (Map<String, Object>[]) null).size() == 1
+ && jobManager.getStatistics().getNumberOfQueuedJobs()
== 0
+ && jobManager.getStatistics().getNumberOfActiveJobs()
== 0;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Waiting for job failure to be recorded. Conditions " +
+ "faildJobsCount=" + failedJobsCount.get() +
+ ", finishedEvents=" + finishedEvents.size() +
+ ", findJobs= " +
jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
+ (Map<String, Object>[]) null).size()
+ +", queuedJobs=" +
jobManager.getStatistics().getNumberOfQueuedJobs()
+ +", activeJobs=" +
jobManager.getStatistics().getNumberOfActiveJobs();
+ }
+ }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
+
+ jobManager.removeJobById(id); // moves the job to the history section
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC +
"/failed", -1, (Map<String, Object>[])null).size());
- // dao is an invisible class for the dynamic class loader as it is
not public
- // therefore scheduling this job should fail!
- final DataObject dao = new DataObject();
-
- // we start a single job
- final Map<String, Object> props = new HashMap<String, Object>();
- props.put("dao", dao);
-
- final String id = jobManager.addJob(TOPIC + "/failed",
props).getId();
-
- // wait until the conditions are met
- new RetryLoop(new RetryLoop.Condition() {
-
- @Override
- public boolean isTrue() throws Exception {
- return failedJobsCount.get() == 0
- && finishedEvents.size() == 0
- && jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC + "/failed", -1,
- (Map<String, Object>[]) null).size() == 1
- &&
jobManager.getStatistics().getNumberOfQueuedJobs() == 0
- &&
jobManager.getStatistics().getNumberOfActiveJobs() == 0;
- }
-
- @Override
- public String getDescription() {
- return "Waiting for job failure to be recorded. Conditions
" +
- "faildJobsCount=" + failedJobsCount.get() +
- ", finishedEvents=" + finishedEvents.size() +
- ", findJobs= " +
jobManager.findJobs(JobManager.QueryType.ALL, TOPIC + "/failed", -1,
- (Map<String, Object>[]) null).size()
- +", queuedJobs=" +
jobManager.getStatistics().getNumberOfQueuedJobs()
- +", activeJobs=" +
jobManager.getStatistics().getNumberOfActiveJobs();
- }
- }, CONDITION_TIMEOUT_SECONDS, CONDITION_INTERVAL_MILLIS);
-
- jobManager.removeJobById(id); // moves the job to the history
section
- assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC + "/failed", -1, (Map<String, Object>[])null).size());
-
- jobManager.removeJobById(id); // removes the job permanently
- } finally {
- jcReg.unregister();
- ehReg.unregister();
- }
+ jobManager.removeJobById(id); // removes the job permanently
}
private static final class DataObject implements Serializable {
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/HistoryTest.java
Fri Jan 8 16:51:13 2016
@@ -41,7 +41,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
@RunWith(PaxExam.class)
public class HistoryTest extends AbstractJobHandlingTest {
@@ -88,7 +87,7 @@ public class HistoryTest extends Abstrac
*/
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testHistory() throws Exception {
- final ServiceRegistration reg = this.registerJobExecutor(TOPIC,
+ this.registerJobExecutor(TOPIC,
new JobExecutor() {
@Override
@@ -102,51 +101,41 @@ public class HistoryTest extends Abstrac
}
});
- Collection<Job> col = null;
- try {
- for(int i = 0; i< 10; i++) {
- this.addJob(i);
- }
- this.sleep(200L);
- while (
this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1,
(Map<String, Object>[])null).size() < 10 ) {
- this.sleep(20L);
- }
- col = this.getJobManager().findJobs(JobManager.QueryType.HISTORY,
TOPIC, -1, (Map<String, Object>[])null);
- assertEquals(10, col.size());
- assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.ACTIVE, TOPIC, -1,
(Map<String, Object>[])null).size());
- assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.QUEUED, TOPIC, -1,
(Map<String, Object>[])null).size());
- assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String,
Object>[])null).size());
- assertEquals(3,
this.getJobManager().findJobs(JobManager.QueryType.CANCELLED, TOPIC, -1,
(Map<String, Object>[])null).size());
- assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.DROPPED, TOPIC, -1,
(Map<String, Object>[])null).size());
- assertEquals(3,
this.getJobManager().findJobs(JobManager.QueryType.ERROR, TOPIC, -1,
(Map<String, Object>[])null).size());
- assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.GIVEN_UP, TOPIC, -1,
(Map<String, Object>[])null).size());
- assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.STOPPED, TOPIC, -1,
(Map<String, Object>[])null).size());
- assertEquals(7,
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, TOPIC, -1,
(Map<String, Object>[])null).size());
-
- // find all topics
- assertEquals(7,
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, null, -1,
(Map<String, Object>[])null).size());
-
- // verify order, message and state
- long last = 9;
- for(final Job j : col) {
- assertNotNull(j.getFinishedDate());
- final long count = j.getProperty(PROP_COUNTER, Long.class);
- assertEquals(last, count);
- if ( count == 2 || count == 5 || count == 7 ) {
- assertEquals(Job.JobState.ERROR, j.getJobState());
- } else {
- assertEquals(Job.JobState.SUCCEEDED, j.getJobState());
- }
- assertEquals(j.getJobState().name(), j.getResultMessage());
- last--;
- }
- } finally {
- if ( col != null ) {
- for(final Job j : col) {
- this.getJobManager().removeJobById(j.getId());
- }
+ for(int i = 0; i< 10; i++) {
+ this.addJob(i);
+ }
+ this.sleep(200L);
+ while ( this.getJobManager().findJobs(JobManager.QueryType.HISTORY,
TOPIC, -1, (Map<String, Object>[])null).size() < 10 ) {
+ this.sleep(20L);
+ }
+ Collection<Job> col =
this.getJobManager().findJobs(JobManager.QueryType.HISTORY, TOPIC, -1,
(Map<String, Object>[])null);
+ assertEquals(10, col.size());
+ assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.ACTIVE, TOPIC, -1,
(Map<String, Object>[])null).size());
+ assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.QUEUED, TOPIC, -1,
(Map<String, Object>[])null).size());
+ assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String,
Object>[])null).size());
+ assertEquals(3,
this.getJobManager().findJobs(JobManager.QueryType.CANCELLED, TOPIC, -1,
(Map<String, Object>[])null).size());
+ assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.DROPPED, TOPIC, -1,
(Map<String, Object>[])null).size());
+ assertEquals(3,
this.getJobManager().findJobs(JobManager.QueryType.ERROR, TOPIC, -1,
(Map<String, Object>[])null).size());
+ assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.GIVEN_UP, TOPIC, -1,
(Map<String, Object>[])null).size());
+ assertEquals(0,
this.getJobManager().findJobs(JobManager.QueryType.STOPPED, TOPIC, -1,
(Map<String, Object>[])null).size());
+ assertEquals(7,
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, TOPIC, -1,
(Map<String, Object>[])null).size());
+
+ // find all topics
+ assertEquals(7,
this.getJobManager().findJobs(JobManager.QueryType.SUCCEEDED, null, -1,
(Map<String, Object>[])null).size());
+
+ // verify order, message and state
+ long last = 9;
+ for(final Job j : col) {
+ assertNotNull(j.getFinishedDate());
+ final long count = j.getProperty(PROP_COUNTER, Long.class);
+ assertEquals(last, count);
+ if ( count == 2 || count == 5 || count == 7 ) {
+ assertEquals(Job.JobState.ERROR, j.getJobState());
+ } else {
+ assertEquals(Job.JobState.SUCCEEDED, j.getJobState());
}
- reg.unregister();
+ assertEquals(j.getJobState().name(), j.getResultMessage());
+ last--;
}
}
}
\ No newline at end of file
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
Fri Jan 8 16:51:13 2016
@@ -49,7 +49,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
@@ -90,7 +89,7 @@ public class JobHandlingTest extends Abs
public void testSimpleJobExecutionUsingJobConsumer() throws Exception {
final Barrier cb = new Barrier(2);
- final ServiceRegistration reg = this.registerJobConsumer(TOPIC,
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
@@ -100,14 +99,10 @@ public class JobHandlingTest extends Abs
}
});
- try {
- this.getJobManager().addJob(TOPIC, null);
- assertTrue("No event received in the given time.", cb.block(5));
- cb.reset();
- assertFalse("Unexpected event received in the given time.",
cb.block(5));
- } finally {
- reg.unregister();
- }
+ this.getJobManager().addJob(TOPIC, null);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.",
cb.block(5));
}
/**
@@ -118,7 +113,7 @@ public class JobHandlingTest extends Abs
public void testSimpleJobExecutionUsingJobExecutor() throws Exception {
final Barrier cb = new Barrier(2);
- final ServiceRegistration reg = this.registerJobExecutor(TOPIC,
+ this.registerJobExecutor(TOPIC,
new JobExecutor() {
@Override
@@ -128,19 +123,15 @@ public class JobHandlingTest extends Abs
}
});
- try {
- this.getJobManager().addJob(TOPIC, null);
- assertTrue("No event received in the given time.", cb.block(5));
- cb.reset();
- assertFalse("Unexpected event received in the given time.",
cb.block(5));
- } finally {
- reg.unregister();
- }
+ this.getJobManager().addJob(TOPIC, null);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.",
cb.block(5));
}
@Test(timeout = DEFAULT_TEST_TIMEOUT)
public void testManyJobs() throws Exception {
- final ServiceRegistration reg1 = this.registerJobConsumer(TOPIC,
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
@@ -150,7 +141,7 @@ public class JobHandlingTest extends Abs
});
final AtomicInteger count = new AtomicInteger(0);
- final ServiceRegistration reg2 =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
public void handleEvent(final Event event) {
@@ -158,23 +149,16 @@ public class JobHandlingTest extends Abs
}
});
- try {
- // we start "some" jobs
- final int COUNT = 300;
- for(int i = 0; i < COUNT; i++ ) {
- this.getJobManager().addJob(TOPIC, null);
- }
- while ( count.get() < COUNT ) {
- this.sleep(50);
- }
- assertEquals("Finished count", COUNT, count.get());
- assertEquals("Finished count", COUNT,
this.getJobManager().getStatistics().getNumberOfFinishedJobs());
- } finally {
- reg1.unregister();
- reg2.unregister();
+ // we start "some" jobs
+ final int COUNT = 300;
+ for(int i = 0; i < COUNT; i++ ) {
+ this.getJobManager().addJob(TOPIC, null);
}
- // we put an extra sleep to see whether this fixes the test problems
on Java 8
- this.sleep(5000);
+ while ( count.get() < COUNT ) {
+ this.sleep(50);
+ }
+ assertEquals("Finished count", COUNT, count.get());
+ assertEquals("Finished count", COUNT,
this.getJobManager().getStatistics().getNumberOfFinishedJobs());
}
/**
@@ -185,7 +169,7 @@ public class JobHandlingTest extends Abs
public void testCancelJob() throws Exception {
final Barrier cb = new Barrier(2);
final Barrier cb2 = new Barrier(2);
- final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
@@ -195,41 +179,38 @@ public class JobHandlingTest extends Abs
return JobResult.FAILED;
}
});
+
+ final Map<String, Object> jobProperties =
Collections.singletonMap("id", (Object)"cancelJobId");
+ @SuppressWarnings("unchecked")
+ final Map<String, Object>[] jobPropertiesAsArray = new Map[1];
+ jobPropertiesAsArray[0] = jobProperties;
+
+ // create job
+ final JobManager jobManager = this.getJobManager();
+ jobManager.addJob(TOPIC, jobProperties);
+ cb.block();
+
+ assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC,
-1, jobPropertiesAsArray).size());
+ // job is currently waiting, therefore cancel fails
+ final Job e1 = jobManager.getJob(TOPIC, jobProperties);
+ assertNotNull(e1);
+ cb2.block(); // and continue job
+
+ sleep(200);
+
+ // the job is now in the queue again
+ final Job e2 = jobManager.getJob(TOPIC, jobProperties);
+ assertNotNull(e2);
+ assertTrue(jobManager.removeJobById(e2.getId()));
+ assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC,
-1, jobPropertiesAsArray).size());
+ final Collection<Job> col =
jobManager.findJobs(JobManager.QueryType.HISTORY, TOPIC, -1,
+ jobPropertiesAsArray);
try {
- final Map<String, Object> jobProperties =
Collections.singletonMap("id", (Object)"cancelJobId");
- @SuppressWarnings("unchecked")
- final Map<String, Object>[] jobPropertiesAsArray = new Map[1];
- jobPropertiesAsArray[0] = jobProperties;
-
- // create job
- final JobManager jobManager = this.getJobManager();
- jobManager.addJob(TOPIC, jobProperties);
- cb.block();
-
- assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC, -1, jobPropertiesAsArray).size());
- // job is currently waiting, therefore cancel fails
- final Job e1 = jobManager.getJob(TOPIC, jobProperties);
- assertNotNull(e1);
- cb2.block(); // and continue job
-
- sleep(200);
-
- // the job is now in the queue again
- final Job e2 = jobManager.getJob(TOPIC, jobProperties);
- assertNotNull(e2);
- assertTrue(jobManager.removeJobById(e2.getId()));
- assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC, -1, jobPropertiesAsArray).size());
- final Collection<Job> col =
jobManager.findJobs(JobManager.QueryType.HISTORY, TOPIC, -1,
- jobPropertiesAsArray);
- try {
- assertEquals(1, col.size());
- } finally {
- for(final Job j : col) {
- jobManager.removeJobById(j.getId());
- }
- }
+ assertEquals(1, col.size());
} finally {
- jcReg.unregister();
+ for(final Job j : col) {
+ jobManager.removeJobById(j.getId());
+ }
}
}
@@ -240,7 +221,7 @@ public class JobHandlingTest extends Abs
public void testGetJob() throws Exception {
final Barrier cb = new Barrier(2);
final Barrier cb2 = new Barrier(2);
- final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
@@ -250,19 +231,15 @@ public class JobHandlingTest extends Abs
return JobResult.OK;
}
});
- try {
- final JobManager jobManager = this.getJobManager();
- final Job j = jobManager.addJob(TOPIC, null);
- cb.block();
+ final JobManager jobManager = this.getJobManager();
+ final Job j = jobManager.addJob(TOPIC, null);
+ cb.block();
- assertNotNull(jobManager.getJob(TOPIC, null));
+ assertNotNull(jobManager.getJob(TOPIC, null));
- cb2.block(); // and continue job
+ cb2.block(); // and continue job
- jobManager.removeJobById(j.getId());
- } finally {
- jcReg.unregister();
- }
+ jobManager.removeJobById(j.getId());
}
/**
@@ -273,7 +250,8 @@ public class JobHandlingTest extends Abs
public void testStartJobAndReschedule() throws Exception {
final List<Integer> retryCountList = new ArrayList<Integer>();
final Barrier cb = new Barrier(2);
- final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
int retryCount;
@@ -291,26 +269,23 @@ public class JobHandlingTest extends Abs
return JobResult.FAILED;
}
});
- try {
- final JobManager jobManager = this.getJobManager();
- final Job job = jobManager.addJob(TOPIC, null);
- assertTrue("No event received in the given time.", cb.block(5));
- cb.reset();
- // the job is retried after two seconds, so we wait again
- assertTrue("No event received in the given time.", cb.block(5));
- cb.reset();
- // the job is retried after two seconds, so we wait again
- assertTrue("No event received in the given time.", cb.block(5));
- // we have reached the retry so we expect to not get an event
- cb.reset();
- assertFalse("Unexpected event received in the given time.",
cb.block(5));
- assertEquals("Unexpected number of retries", 3,
retryCountList.size());
+ final JobManager jobManager = this.getJobManager();
+ final Job job = jobManager.addJob(TOPIC, null);
- jobManager.removeJobById(job.getId());
- } finally {
- jcReg.unregister();
- }
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+ // the job is retried after two seconds, so we wait again
+ assertTrue("No event received in the given time.", cb.block(5));
+ // we have reached the retry so we expect to not get an event
+ cb.reset();
+ assertFalse("Unexpected event received in the given time.",
cb.block(5));
+ assertEquals("Unexpected number of retries", 3, retryCountList.size());
+
+ jobManager.removeJobById(job.getId());
}
/**
@@ -324,7 +299,7 @@ public class JobHandlingTest extends Abs
final List<String> failed = Collections.synchronizedList(new
ArrayList<String>());
final List<String> finished = Collections.synchronizedList(new
ArrayList<String>());
final List<String> started = Collections.synchronizedList(new
ArrayList<String>());
- final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC,
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
@@ -362,7 +337,7 @@ public class JobHandlingTest extends Abs
return JobResult.FAILED;
}
});
- final ServiceRegistration eh1Reg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_CANCELLED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_CANCELLED,
new EventHandler() {
@Override
@@ -371,7 +346,7 @@ public class JobHandlingTest extends Abs
cancelled.add(id);
}
});
- final ServiceRegistration eh2Reg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FAILED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FAILED,
new EventHandler() {
@Override
@@ -380,7 +355,7 @@ public class JobHandlingTest extends Abs
failed.add(id);
}
});
- final ServiceRegistration eh3Reg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -389,7 +364,7 @@ public class JobHandlingTest extends Abs
finished.add(id);
}
});
- final ServiceRegistration eh4Reg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_STARTED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_STARTED,
new EventHandler() {
@Override
@@ -400,37 +375,26 @@ public class JobHandlingTest extends Abs
});
final JobManager jobManager = this.getJobManager();
- try {
- jobManager.addJob(TOPIC, Collections.singletonMap("id",
(Object)"1"));
- jobManager.addJob(TOPIC, Collections.singletonMap("id",
(Object)"2"));
- jobManager.addJob(TOPIC, Collections.singletonMap("id",
(Object)"3"));
- jobManager.addJob(TOPIC, Collections.singletonMap("id",
(Object)"4"));
- jobManager.addJob(TOPIC, Collections.singletonMap("id",
(Object)"5"));
-
- int count = 0;
- final long startTime = System.currentTimeMillis();
- do {
- count = finished.size() + cancelled.size();
- // after 25 seconds we cancel the test
- if ( System.currentTimeMillis() - startTime > 25000 ) {
- throw new Exception("Timeout during notification test.");
- }
- } while ( count < 5 || started.size() < 10 );
- assertEquals("Finished count", 4, finished.size());
- assertEquals("Cancelled count", 1, cancelled.size());
- assertEquals("Started count", 10, started.size());
- assertEquals("Failed count", 5, failed.size());
- } finally {
- final Collection<Job> col =
jobManager.findJobs(JobManager.QueryType.HISTORY, "sling/test", -1,
(Map<String, Object>[])null);
- for(final Job j : col) {
- jobManager.removeJobById(j.getId());
+
+ jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"1"));
+ jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"2"));
+ jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"3"));
+ jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"4"));
+ jobManager.addJob(TOPIC, Collections.singletonMap("id", (Object)"5"));
+
+ int count = 0;
+ final long startTime = System.currentTimeMillis();
+ do {
+ count = finished.size() + cancelled.size();
+ // after 25 seconds we cancel the test
+ if ( System.currentTimeMillis() - startTime > 25000 ) {
+ throw new Exception("Timeout during notification test.");
}
- jcReg.unregister();
- eh1Reg.unregister();
- eh2Reg.unregister();
- eh3Reg.unregister();
- eh4Reg.unregister();
- }
+ } while ( count < 5 || started.size() < 10 );
+ assertEquals("Finished count", 4, finished.size());
+ assertEquals("Cancelled count", 1, cancelled.size());
+ assertEquals("Started count", 10, started.size());
+ assertEquals("Failed count", 5, failed.size());
}
/**
@@ -440,7 +404,7 @@ public class JobHandlingTest extends Abs
public void testNoJobProcessor() throws Exception {
final AtomicInteger count = new AtomicInteger(0);
- final ServiceRegistration eh1 = this.registerJobConsumer(TOPIC,
+ this.registerJobConsumer(TOPIC,
new JobConsumer() {
@Override
@@ -451,31 +415,22 @@ public class JobHandlingTest extends Abs
}
});
- try {
- final JobManager jobManager = this.getJobManager();
-
- // we start 20 jobs, every second job has no processor
- final int COUNT = 20;
- for(int i = 0; i < COUNT; i++ ) {
- final String jobTopic = (i % 2 == 0 ? TOPIC : TOPIC + "2");
-
- jobManager.addJob(jobTopic, null);
- }
- while ( jobManager.getStatistics().getNumberOfFinishedJobs() <
COUNT / 2) {
- this.sleep(50);
- }
+ final JobManager jobManager = this.getJobManager();
- assertEquals("Finished count", COUNT / 2, count.get());
- // unprocessed count should be 0 as there is no job consumer for
this job
- assertEquals("Unprocessed count", 0,
jobManager.getStatistics().getNumberOfJobs());
- assertEquals("Finished count", COUNT / 2,
jobManager.getStatistics().getNumberOfFinishedJobs());
+ // we start 20 jobs, every second job has no processor
+ final int COUNT = 20;
+ for(int i = 0; i < COUNT; i++ ) {
+ final String jobTopic = (i % 2 == 0 ? TOPIC : TOPIC + "2");
- // now remove jobs
- for(final Job j : jobManager.findJobs(JobManager.QueryType.ALL,
TOPIC + "2", -1, (Map<String, Object>[])null)) {
- jobManager.removeJobById(j.getId());
- }
- } finally {
- eh1.unregister();
+ jobManager.addJob(jobTopic, null);
+ }
+ while ( jobManager.getStatistics().getNumberOfFinishedJobs() < COUNT /
2) {
+ this.sleep(50);
}
+
+ assertEquals("Finished count", COUNT / 2, count.get());
+ // unprocessed count should be 0 as there is no job consumer for this
job
+ assertEquals("Unprocessed count", 0,
jobManager.getStatistics().getNumberOfJobs());
+ assertEquals("Finished count", COUNT / 2,
jobManager.getStatistics().getNumberOfFinishedJobs());
}
}
\ No newline at end of file
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
Fri Jan 8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
@@ -86,7 +85,7 @@ public class OrderedQueueTest extends Ab
final Barrier cb = new Barrier(2);
final AtomicInteger count = new AtomicInteger(0);
final AtomicInteger parallelCount = new AtomicInteger(0);
- final ServiceRegistration<JobConsumer> jcReg =
this.registerJobConsumer("sling/orderedtest/*",
+ this.registerJobConsumer("sling/orderedtest/*",
new JobConsumer() {
private volatile int lastCounter = -1;
@@ -123,7 +122,7 @@ public class OrderedQueueTest extends Ab
return JobResult.OK;
}
});
- final ServiceRegistration<EventHandler> ehReg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -132,48 +131,43 @@ public class OrderedQueueTest extends Ab
}
});
- try {
- // we first sent one event to get the queue started
- final Map<String, Object> properties = new HashMap<String,
Object>();
- properties.put("counter", -1);
- jobManager.addJob("sling/orderedtest/start", properties);
- assertTrue("No event received in the given time.", cb.block(5));
- cb.reset();
-
- // get the queue
- final Queue q = jobManager.getQueue("orderedtest");
- assertNotNull("Queue 'orderedtest' should exist!", q);
-
- // suspend it
- q.suspend();
-
- final int NUM_JOBS = 30;
-
- // we start "some" jobs:
- for(int i = 0; i < NUM_JOBS; i++ ) {
- final String subTopic = "sling/orderedtest/sub" + (i % 10);
- properties.clear();
- properties.put("counter", i);
- jobManager.addJob(subTopic, properties);
- }
- // start the queue
- q.resume();
- while ( count.get() < NUM_JOBS +1 ) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException ie) {
- // ignore
- }
+ // we first sent one event to get the queue started
+ final Map<String, Object> properties = new HashMap<String, Object>();
+ properties.put("counter", -1);
+ jobManager.addJob("sling/orderedtest/start", properties);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+
+ // get the queue
+ final Queue q = jobManager.getQueue("orderedtest");
+ assertNotNull("Queue 'orderedtest' should exist!", q);
+
+ // suspend it
+ q.suspend();
+
+ final int NUM_JOBS = 30;
+
+ // we start "some" jobs:
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ final String subTopic = "sling/orderedtest/sub" + (i % 10);
+ properties.clear();
+ properties.put("counter", i);
+ jobManager.addJob(subTopic, properties);
+ }
+ // start the queue
+ q.resume();
+ while ( count.get() < NUM_JOBS +1 ) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ie) {
+ // ignore
}
- // we started one event before the test, so add one
- assertEquals("Finished count", NUM_JOBS + 1, count.get());
- assertEquals("Finished count", NUM_JOBS + 1,
jobManager.getStatistics().getNumberOfFinishedJobs());
- assertEquals("Finished count", NUM_JOBS + 1,
q.getStatistics().getNumberOfFinishedJobs());
- assertEquals("Failed count", NUM_JOBS / 10,
q.getStatistics().getNumberOfFailedJobs());
- assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
- } finally {
- jcReg.unregister();
- ehReg.unregister();
}
+ // we started one event before the test, so add one
+ assertEquals("Finished count", NUM_JOBS + 1, count.get());
+ assertEquals("Finished count", NUM_JOBS + 1,
jobManager.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Finished count", NUM_JOBS + 1,
q.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Failed count", NUM_JOBS / 10,
q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
}
}
\ No newline at end of file
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
Fri Jan 8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
@@ -87,7 +86,7 @@ public class RoundRobinQueueTest extends
final Barrier cb = new Barrier(2);
- final ServiceRegistration jc1Reg = this.registerJobConsumer(TOPIC +
"/start",
+ this.registerJobConsumer(TOPIC + "/start",
new JobConsumer() {
@Override
@@ -102,7 +101,7 @@ public class RoundRobinQueueTest extends
final AtomicInteger parallelCount = new AtomicInteger(0);
final Set<Integer> maxParticipants = new HashSet<Integer>();
- final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC +
"/*",
+ this.registerJobConsumer(TOPIC + "/*",
new JobConsumer() {
@Override
@@ -120,7 +119,7 @@ public class RoundRobinQueueTest extends
return JobResult.OK;
}
});
- final ServiceRegistration ehReg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -129,51 +128,45 @@ public class RoundRobinQueueTest extends
}
});
- try {
- // we first sent one event to get the queue started
- jobManager.addJob(TOPIC + "/start", null);
- assertTrue("No event received in the given time.", cb.block(5));
- cb.reset();
-
- // get the queue
- final Queue q = jobManager.getQueue(QUEUE_NAME);
- assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
-
- // suspend it
- q.suspend();
-
- // we start "some" jobs:
- for(int i = 0; i < NUM_JOBS; i++ ) {
- final String subTopic = TOPIC + "/sub" + (i % 10);
- final Map<String, Object> props = new HashMap<String,
Object>();
- if ( i < 10 ) {
- props.put("sleep", 300);
- } else {
- props.put("sleep", 30);
- }
- jobManager.addJob(subTopic, props);
+ // we first sent one event to get the queue started
+ jobManager.addJob(TOPIC + "/start", null);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+
+ // get the queue
+ final Queue q = jobManager.getQueue(QUEUE_NAME);
+ assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
+
+ // suspend it
+ q.suspend();
+
+ // we start "some" jobs:
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ final String subTopic = TOPIC + "/sub" + (i % 10);
+ final Map<String, Object> props = new HashMap<String, Object>();
+ if ( i < 10 ) {
+ props.put("sleep", 300);
+ } else {
+ props.put("sleep", 30);
}
- // start the queue
- q.resume();
- while ( count.get() < NUM_JOBS + 1 ) {
- assertEquals("Failed count", 0,
q.getStatistics().getNumberOfFailedJobs());
- assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
- sleep(300);
- }
- // we started one event before the test, so add one
- assertEquals("Finished count", NUM_JOBS + 1, count.get());
- assertEquals("Finished count", NUM_JOBS + 1,
jobManager.getStatistics().getNumberOfFinishedJobs());
- assertEquals("Finished count", NUM_JOBS + 1,
q.getStatistics().getNumberOfFinishedJobs());
+ jobManager.addJob(subTopic, props);
+ }
+ // start the queue
+ q.resume();
+ while ( count.get() < NUM_JOBS + 1 ) {
assertEquals("Failed count", 0,
q.getStatistics().getNumberOfFailedJobs());
assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
- for(int i=1; i <= MAX_PAR; i++) {
- assertTrue("# Participants " + String.valueOf(i) + " not in "
+ maxParticipants,
- maxParticipants.contains(i));
- }
- } finally {
- jc1Reg.unregister();
- jcReg.unregister();
- ehReg.unregister();
+ sleep(300);
+ }
+ // we started one event before the test, so add one
+ assertEquals("Finished count", NUM_JOBS + 1, count.get());
+ assertEquals("Finished count", NUM_JOBS + 1,
jobManager.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Finished count", NUM_JOBS + 1,
q.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Failed count", 0,
q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
+ for(int i=1; i <= MAX_PAR; i++) {
+ assertTrue("# Participants " + String.valueOf(i) + " not in " +
maxParticipants,
+ maxParticipants.contains(i));
}
}
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/SchedulingTest.java
Fri Jan 8 16:51:13 2016
@@ -32,7 +32,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
@RunWith(PaxExam.class)
public class SchedulingTest extends AbstractJobHandlingTest {
@@ -57,7 +56,7 @@ public class SchedulingTest extends Abst
public void testScheduling() throws Exception {
final AtomicInteger counter = new AtomicInteger();
- final ServiceRegistration ehReg = this.registerJobConsumer(TOPIC, new
JobConsumer() {
+ this.registerJobConsumer(TOPIC, new JobConsumer() {
@Override
public JobResult process(final Job job) {
@@ -68,26 +67,21 @@ public class SchedulingTest extends Abst
}
});
- try {
- // we schedule three jobs
- final ScheduledJobInfo info1 =
this.getJobManager().createJob(TOPIC).schedule().hourly(5).add();
- assertNotNull(info1);
- final ScheduledJobInfo info2 =
this.getJobManager().createJob(TOPIC).schedule().daily(10, 5).add();
- assertNotNull(info2);
- final ScheduledJobInfo info3 =
this.getJobManager().createJob(TOPIC).schedule().weekly(3, 19, 12).add();
- assertNotNull(info3);
-
- assertEquals(3, this.getJobManager().getScheduledJobs().size());
// scheduled jobs
- info3.unschedule();
- assertEquals(2, this.getJobManager().getScheduledJobs().size());
// scheduled jobs
- info1.unschedule();
- assertEquals(1, this.getJobManager().getScheduledJobs().size());
// scheduled jobs
- info2.unschedule();
- assertEquals(0, this.getJobManager().getScheduledJobs().size());
// scheduled jobs
- } finally {
- ehReg.unregister();
- }
+ // we schedule three jobs
+ final ScheduledJobInfo info1 =
this.getJobManager().createJob(TOPIC).schedule().hourly(5).add();
+ assertNotNull(info1);
+ final ScheduledJobInfo info2 =
this.getJobManager().createJob(TOPIC).schedule().daily(10, 5).add();
+ assertNotNull(info2);
+ final ScheduledJobInfo info3 =
this.getJobManager().createJob(TOPIC).schedule().weekly(3, 19, 12).add();
+ assertNotNull(info3);
+
+ assertEquals(3, this.getJobManager().getScheduledJobs().size()); //
scheduled jobs
+ info3.unschedule();
+ assertEquals(2, this.getJobManager().getScheduledJobs().size()); //
scheduled jobs
+ info1.unschedule();
+ assertEquals(1, this.getJobManager().getScheduledJobs().size()); //
scheduled jobs
+ info2.unschedule();
+ assertEquals(0, this.getJobManager().getScheduledJobs().size()); //
scheduled jobs
}
-
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
Fri Jan 8 16:51:13 2016
@@ -33,7 +33,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
@RunWith(PaxExam.class)
public class TimedJobsTest extends AbstractJobHandlingTest {
@@ -58,7 +57,7 @@ public class TimedJobsTest extends Abstr
public void testTimedJob() throws Exception {
final AtomicInteger counter = new AtomicInteger();
- final ServiceRegistration ehReg = this.registerJobConsumer(TOPIC, new
JobConsumer() {
+ this.registerJobConsumer(TOPIC, new JobConsumer() {
@Override
public JobResult process(final Job job) {
@@ -69,22 +68,19 @@ public class TimedJobsTest extends Abstr
}
});
- try {
- final Date d = new Date();
- d.setTime(System.currentTimeMillis() + 3000); // run in 3 seconds
-
- // create scheduled job
- final ScheduledJobInfo info =
this.getJobManager().createJob(TOPIC).schedule().at(d).add();
- assertNotNull(info);
- while ( counter.get() == 0 ) {
- this.sleep(1000);
- }
- assertEquals(0, this.getJobManager().getScheduledJobs().size());
// job is not scheduled anymore
- info.unschedule();
- } finally {
- ehReg.unregister();
+ final Date d = new Date();
+ d.setTime(System.currentTimeMillis() + 3000); // run in 3 seconds
+
+ // create scheduled job
+ final ScheduledJobInfo info =
this.getJobManager().createJob(TOPIC).schedule().at(d).add();
+ assertNotNull(info);
+
+ while ( counter.get() == 0 ) {
+ this.sleep(1000);
}
+ assertEquals(0, this.getJobManager().getScheduledJobs().size()); //
job is not scheduled anymore
+ info.unschedule();
}
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TopicMatchingTest.java
Fri Jan 8 16:51:13 2016
@@ -61,7 +61,7 @@ public class TopicMatchingTest extends A
public void testSimpleMatching() throws Exception {
final Barrier barrier = new Barrier(2);
- final ServiceRegistration<JobExecutor> reg =
this.registerJobExecutor("sling/test/*",
+ this.registerJobExecutor("sling/test/*",
new JobExecutor() {
@Override
@@ -69,7 +69,7 @@ public class TopicMatchingTest extends A
return context.result().succeeded();
}
});
- final ServiceRegistration<EventHandler> eventHandler =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -78,13 +78,8 @@ public class TopicMatchingTest extends A
}
});
- try {
- this.getJobManager().addJob(TOPIC, null);
- barrier.block();
- } finally {
- reg.unregister();
- eventHandler.unregister();
- }
+ this.getJobManager().addJob(TOPIC, null);
+ barrier.block();
}
/**
@@ -94,7 +89,7 @@ public class TopicMatchingTest extends A
public void testDeepMatching() throws Exception {
final Barrier barrier = new Barrier(2);
- final ServiceRegistration<JobExecutor> reg =
this.registerJobExecutor("sling/**",
+ this.registerJobExecutor("sling/**",
new JobExecutor() {
@Override
@@ -102,7 +97,7 @@ public class TopicMatchingTest extends A
return context.result().succeeded();
}
});
- final ServiceRegistration<EventHandler> eventHandler =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -111,13 +106,8 @@ public class TopicMatchingTest extends A
}
});
- try {
- this.getJobManager().addJob(TOPIC, null);
- barrier.block();
- } finally {
- reg.unregister();
- eventHandler.unregister();
- }
+ this.getJobManager().addJob(TOPIC, null);
+ barrier.block();
}
/**
@@ -129,7 +119,7 @@ public class TopicMatchingTest extends A
final Barrier barrier2 = new Barrier(2);
final Barrier barrier3 = new Barrier(2);
- final ServiceRegistration<JobExecutor> reg1 =
this.registerJobExecutor("sling/**",
+ this.registerJobExecutor("sling/**",
new JobExecutor() {
@Override
@@ -163,17 +153,16 @@ public class TopicMatchingTest extends A
// second test, unregister reg3, now it should be reg2
long cc = this.getConsumerChangeCount();
- reg3.unregister();
+ this.unregister(reg3);
this.waitConsumerChangeCount(cc + 1);
this.getJobManager().addJob(TOPIC, null);
barrier2.block();
// third test, unregister reg2, reg1 is now the only one
cc = this.getConsumerChangeCount();
- reg2.unregister();
+ this.unregister(reg2);
this.waitConsumerChangeCount(cc + 1);
this.getJobManager().addJob(TOPIC, null);
barrier1.block();
- reg1.unregister();
}
}
Modified:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java?rev=1723760&r1=1723759&r2=1723760&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java
(original)
+++
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/UnorderedQueueTest.java
Fri Jan 8 16:51:13 2016
@@ -44,7 +44,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
@@ -87,7 +86,7 @@ public class UnorderedQueueTest extends
final Barrier cb = new Barrier(2);
- final ServiceRegistration jc1Reg = this.registerJobConsumer(TOPIC +
"/start",
+ this.registerJobConsumer(TOPIC + "/start",
new JobConsumer() {
@Override
@@ -102,7 +101,7 @@ public class UnorderedQueueTest extends
final AtomicInteger parallelCount = new AtomicInteger(0);
final Set<Integer> maxParticipants = new HashSet<Integer>();
- final ServiceRegistration jcReg = this.registerJobConsumer(TOPIC +
"/*",
+ this.registerJobConsumer(TOPIC + "/*",
new JobConsumer() {
@Override
@@ -120,7 +119,7 @@ public class UnorderedQueueTest extends
return JobResult.OK;
}
});
- final ServiceRegistration ehReg =
this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
+ this.registerEventHandler(NotificationConstants.TOPIC_JOB_FINISHED,
new EventHandler() {
@Override
@@ -129,51 +128,45 @@ public class UnorderedQueueTest extends
}
});
- try {
- // we first sent one event to get the queue started
- jobManager.addJob(TOPIC + "/start", null);
- assertTrue("No event received in the given time.", cb.block(5));
- cb.reset();
-
- // get the queue
- final Queue q = jobManager.getQueue(QUEUE_NAME);
- assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
-
- // suspend it
- q.suspend();
-
- // we start "some" jobs:
- for(int i = 0; i < NUM_JOBS; i++ ) {
- final String subTopic = TOPIC + "/sub" + (i % 10);
- final Map<String, Object> props = new HashMap<String,
Object>();
- if ( i < 10 ) {
- props.put("sleep", 300);
- } else {
- props.put("sleep", 30);
- }
- jobManager.addJob(subTopic, props);
+ // we first sent one event to get the queue started
+ jobManager.addJob(TOPIC + "/start", null);
+ assertTrue("No event received in the given time.", cb.block(5));
+ cb.reset();
+
+ // get the queue
+ final Queue q = jobManager.getQueue(QUEUE_NAME);
+ assertNotNull("Queue '" + QUEUE_NAME + "' should exist!", q);
+
+ // suspend it
+ q.suspend();
+
+ // we start "some" jobs:
+ for(int i = 0; i < NUM_JOBS; i++ ) {
+ final String subTopic = TOPIC + "/sub" + (i % 10);
+ final Map<String, Object> props = new HashMap<String, Object>();
+ if ( i < 10 ) {
+ props.put("sleep", 300);
+ } else {
+ props.put("sleep", 30);
}
- // start the queue
- q.resume();
- while ( count.get() < NUM_JOBS + 1 ) {
- assertEquals("Failed count", 0,
q.getStatistics().getNumberOfFailedJobs());
- assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
- sleep(300);
- }
- // we started one event before the test, so add one
- assertEquals("Finished count", NUM_JOBS + 1, count.get());
- assertEquals("Finished count", NUM_JOBS + 1,
jobManager.getStatistics().getNumberOfFinishedJobs());
- assertEquals("Finished count", NUM_JOBS + 1,
q.getStatistics().getNumberOfFinishedJobs());
+ jobManager.addJob(subTopic, props);
+ }
+ // start the queue
+ q.resume();
+ while ( count.get() < NUM_JOBS + 1 ) {
assertEquals("Failed count", 0,
q.getStatistics().getNumberOfFailedJobs());
assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
- for(int i=1; i <= MAX_PAR; i++) {
- assertTrue("# Participants " + String.valueOf(i) + " not in "
+ maxParticipants,
- maxParticipants.contains(i));
- }
- } finally {
- jc1Reg.unregister();
- jcReg.unregister();
- ehReg.unregister();
+ sleep(300);
+ }
+ // we started one event before the test, so add one
+ assertEquals("Finished count", NUM_JOBS + 1, count.get());
+ assertEquals("Finished count", NUM_JOBS + 1,
jobManager.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Finished count", NUM_JOBS + 1,
q.getStatistics().getNumberOfFinishedJobs());
+ assertEquals("Failed count", 0,
q.getStatistics().getNumberOfFailedJobs());
+ assertEquals("Cancelled count", 0,
q.getStatistics().getNumberOfCancelledJobs());
+ for(int i=1; i <= MAX_PAR; i++) {
+ assertTrue("# Participants " + String.valueOf(i) + " not in " +
maxParticipants,
+ maxParticipants.contains(i));
}
}
}