Added: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java?rev=940263&view=auto ============================================================================== --- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java (added) +++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DAOConnectionTest.java Sun May 2 17:02:51 2010 @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ode.scheduler.simple; + +import java.util.List; + +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.iapi.Scheduler.JobType; +import org.apache.ode.dao.scheduler.JobDAO; + +import org.apache.ode.dao.scheduler.SchedulerDAOConnection; + +/** + * + * Test of the JDBC delegate. + * + * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m ) + */ +public class DAOConnectionTest extends SchedulerTestBase { + + + public void testGetNodeIds() throws Exception { + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + // should have no node ids in the db, empty list (not null) + _txm.begin();; + List<String> nids = conn.getNodeIds(); + _txm.commit(); + assertNotNull(nids); + assertEquals(0, nids.size()); + + // try for one nodeid + _txm.begin();; + conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"abc", true); + _txm.commit(); + _txm.begin();; + nids = conn.getNodeIds(); + _txm.commit(); + assertEquals(1, nids.size()); + assertTrue(nids.contains("abc")); + + // check that dups are ignored. + _txm.begin();; + conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"abc", true); + _txm.commit(); + _txm.begin();; + nids = conn.getNodeIds(); + _txm.commit(); + assertEquals(1, nids.size()); + assertTrue(nids.contains("abc")); + + // add another nodeid, + _txm.begin();; + conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 0L),"123", true); + _txm.commit(); + + _txm.begin();; + nids = conn.getNodeIds(); + _txm.commit(); + assertEquals(2, nids.size()); + assertTrue(nids.contains("abc")); + assertTrue(nids.contains("123")); + }finally{ + conn.close(); + } + } + + public void testReassign() throws Exception { + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + _txm.begin();; + conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 100L),"n1", false); + conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, 200L),"n2", false); + _txm.commit(); + + _txm.begin();; + int num = conn.updateReassign("n1","n2"); + _txm.commit(); + + assertEquals(1,num); + + _txm.begin();; + List<JobDAO> jobs = conn.dequeueImmediate("n2", 400L, 1000); + _txm.commit(); + + assertEquals(2,jobs.size()); + }finally{ + conn.close(); + } + } + + public void testScheduleImmediateTimeFilter() throws Exception { + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + _txm.begin();; + JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L); + String jobId1 = job.getJobId(); + conn.insertJob(job,"n1", false); + job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L); + String jobId2 = job.getJobId(); + conn.insertJob(job,"n1", false); + _txm.commit(); + + _txm.begin();; + List<JobDAO> jobs = conn.dequeueImmediate("n1", 150L, 1000); + _txm.commit(); + + assertNotNull(jobs); + assertEquals(1, jobs.size()); + assertEquals(jobId1,jobs.get(0).getJobId()); + + _txm.begin();; + jobs = conn.dequeueImmediate("n1", 250L, 1000); + _txm.commit(); + + assertNotNull(jobs); + assertEquals(1, jobs.size()); + assertEquals(jobId2,jobs.get(0).getJobId()); + }finally{ + conn.close(); + } + } + + public void testScheduleImmediateMaxRows() throws Exception { + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + _txm.begin();; + JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L); + String jobId1 = job.getJobId(); + conn.insertJob(job,"n1", false); + job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L); + String jobId2 = job.getJobId(); + conn.insertJob(job,"n1", false); + _txm.commit(); + + _txm.begin();; + List<JobDAO> jobs = conn.dequeueImmediate("n1", 201L, 1); + _txm.commit(); + assertNotNull(jobs); + assertEquals(1, jobs.size()); + assertEquals(jobId1,jobs.get(0).getJobId()); + + _txm.begin();; + jobs = conn.dequeueImmediate("n1", 250L, 1000); + _txm.commit(); + assertNotNull(jobs); + assertEquals(1, jobs.size()); + assertEquals(jobId2,jobs.get(0).getJobId()); + }finally{ + conn.close(); + } + } + + public void testScheduleImmediateNodeFilter() throws Exception { + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + _txm.begin();; + JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L); + String jobId1 = job.getJobId(); + conn.insertJob(job,"n1", false); + job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L); + String jobId2 = job.getJobId(); + conn.insertJob(job,"n2", false); + _txm.commit(); + + _txm.begin();; + List<JobDAO> jobs = conn.dequeueImmediate("n2", 300L, 1000); + _txm.commit(); + + assertNotNull(jobs); + assertEquals(1, jobs.size()); + assertEquals(jobId2,jobs.get(0).getJobId()); + }finally{ + conn.close(); + } + } + + public void testDeleteJob() throws Exception { + + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + _txm.begin();; + JobDAO job = conn.createJob(true, new Scheduler.JobDetails(), true, 100L); + String jobId1 = job.getJobId(); + conn.insertJob(job,"n1", false); + job = conn.createJob(true, new Scheduler.JobDetails(), true, 200L); + String jobId2 = job.getJobId(); + conn.insertJob(job,"n2", false); + _txm.commit(); + + // try deleting, wrong jobid -- del should fail + _txm.begin();; + assertFalse(conn.deleteJob("j1x", "n1")); + assertEquals(2,conn.getNodeIds().size()); + _txm.commit(); + + // wrong nodeid + _txm.begin();; + assertFalse(conn.deleteJob(jobId1, "n1x")); + assertEquals(2,conn.getNodeIds().size()); + _txm.commit(); + + // now do the correct job + _txm.begin();; + assertTrue(conn.deleteJob(jobId1, "n1")); + assertEquals(1,conn.getNodeIds().size()); + _txm.commit(); + }finally{ + conn.close(); + } + } + + public void testUpgrade() throws Exception { + + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + _txm.begin();; + for (int i = 0; i < 200; ++i) + conn.insertJob(conn.createJob(true, new Scheduler.JobDetails(), true, i),null, false); + _txm.commit(); + + _txm.begin();; + int n1 = conn.updateAssignToNode("n1", 0, 3, 100); + int n2 = conn.updateAssignToNode("n2", 1, 3, 100); + int n3 = conn.updateAssignToNode("n3", 2, 3, 100); + _txm.commit(); + // Make sure we got 100 upgraded nodes + assertEquals(100,n1+n2+n3); + + // now do scheduling. + _txm.begin();; + assertEquals(n1,conn.dequeueImmediate("n1", 10000L, 1000).size()); + assertEquals(n2,conn.dequeueImmediate("n2", 10000L, 1000).size()); + assertEquals(n3,conn.dequeueImmediate("n3", 10000L, 1000).size()); + _txm.commit(); + }finally{ + conn.close(); + } + } + + public void testMigration() throws Exception { + SchedulerDAOConnection conn = _factory.getConnection(); + try{ + Scheduler.JobDetails j1 = new Scheduler.JobDetails(); + j1.getDetailsExt().put("type", "MATCHER"); + j1.getDetailsExt().put("iid", 1234L); + j1.getDetailsExt().put("pid", new QName("http://test1", "test2").toString()); + j1.getDetailsExt().put("inmem", true); + j1.getDetailsExt().put("ckey", "123~abcd"); + j1.getDetailsExt().put("channel", "123"); + j1.getDetailsExt().put("mexid", "mexid123"); + j1.getDetailsExt().put("correlatorId", "cid123"); + j1.getDetailsExt().put("retryCount", "15"); + + _txm.begin();; + conn.insertJob(conn.createJob(true, j1, true, 0L), null, false); + conn.updateAssignToNode("m", 0, 3, 100); + _txm.commit(); + + _txm.begin();; + Scheduler.JobDetails j2 = conn.dequeueImmediate("m", 10000L, 1000).get(0).getDetails(); + _txm.commit(); + + assertEquals(j2.getType(), JobType.MATCHER); + assertEquals(j2.getInstanceId(), (Object) 1234L); + assertEquals(j2.getProcessId(), new QName("http://test1", "test2")); + assertEquals(j2.getInMem(), (Object) true); + assertEquals(j2.getCorrelationKey().toCanonicalString(), (Object) "123~abcd"); + assertEquals(j2.getChannel(), (Object) "123"); + assertEquals(j2.getMexId(), (Object) "mexid123"); + assertEquals(j2.getCorrelatorId(), (Object) "cid123"); + assertEquals(j2.getRetryCount(), (Object) 15); + }finally{ + conn.close(); + } + } +}
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=940263&r1=940262&r2=940263&view=diff ============================================================================== --- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (original) +++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Sun May 2 17:02:51 2010 @@ -3,29 +3,26 @@ package org.apache.ode.scheduler.simple; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; -import javax.transaction.TransactionManager; import java.util.*; -import junit.framework.TestCase; +import org.apache.ode.dao.scheduler.SchedulerDAOConnection; /** * @author Matthieu Riou <[email protected]> */ -public class RetriesTest extends TestCase implements Scheduler.JobProcessor { +public class RetriesTest extends SchedulerTestBase implements Scheduler.JobProcessor { private static final Log __log = LogFactory.getLog(RetriesTest.class); - DelegateSupport _ds; + SimpleScheduler _scheduler; ArrayList<Scheduler.JobInfo> _jobs; ArrayList<Scheduler.JobInfo> _commit; - TransactionManager _txm; + int _tried = 0; public void setUp() throws Exception { - _txm = new GeronimoTransactionManager(); - _ds = new DelegateSupport(); + super.setUp(); _scheduler = newScheduler("n1"); _jobs = new ArrayList<Scheduler.JobInfo>(100); @@ -34,6 +31,7 @@ public class RetriesTest extends TestCas public void tearDown() throws Exception { _scheduler.shutdown(); + super.tearDown(); } public void testRetries() throws Exception { @@ -41,11 +39,14 @@ public class RetriesTest extends TestCas _scheduler.setNearFutureInterval(5000); _scheduler.setImmediateInterval(1000); _scheduler.start(); + + SchedulerDAOConnection conn = _factory.getConnection(); _txm.begin(); try { _scheduler.schedulePersistedJob(newDetail("123"), new Date()); } finally { _txm.commit(); + conn.close(); } Thread.sleep(10000); @@ -55,7 +56,7 @@ public class RetriesTest extends TestCas public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException { _tried++; - + __log.debug("onScheduledJob " + jobInfo.jobName); if (jobInfo.retryCount < 2) { __log.debug("retrying " + _tried); throw new Scheduler.JobProcessorException(true); @@ -71,9 +72,8 @@ public class RetriesTest extends TestCas } private SimpleScheduler newScheduler(String nodeId) { - SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties()); + SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties()); scheduler.setJobProcessor(this); - scheduler.setTransactionManager(_txm); return scheduler; } Added: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java?rev=940263&view=auto ============================================================================== --- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java (added) +++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerTestBase.java Sun May 2 17:02:51 2010 @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ode.scheduler.simple; + +import java.io.InputStream; +import java.sql.Connection; + +import java.util.Properties; +import javax.transaction.TransactionManager; +import junit.framework.TestCase; +import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory; +import org.apache.ode.il.EmbeddedGeronimoFactory; +import org.apache.ode.il.config.OdeConfigProperties; +import org.apache.ode.il.dbutil.Database; +import org.apache.ode.il.txutil.TxManager; +import org.apache.ode.scheduler.simple.jdbc.SchedulerDAOConnectionFactoryImpl; + +/** + * Support class for creating a JDBC delegate (using in-mem HSQL db). + * + * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m ) + */ +public class SchedulerTestBase extends TestCase { + + protected Database _db; + protected SchedulerDAOConnectionFactory _factory; + protected TransactionManager _txm; + + @Override + public void setUp() throws Exception { + Properties props = new Properties(); + props.put(OdeConfigProperties.PROP_DAOCF_SCHEDULER, System.getProperty(OdeConfigProperties.PROP_DAOCF_SCHEDULER,OdeConfigProperties.DEFAULT_DAOCF_SCHEDULER_CLASS)); + OdeConfigProperties odeProps = new OdeConfigProperties(props, ""); + TxManager tx = new TxManager(odeProps); + _txm = tx.createTransactionManager(); + _db = new Database(odeProps); + _db.setTransactionManager(_txm); + _db.start(); + _factory = _db.createDaoSchedulerCF(); + + if (_factory instanceof SchedulerDAOConnectionFactoryImpl) { + Connection c = _db.getDataSource().getConnection(); + try { + StringBuffer sql = new StringBuffer(); + + { + InputStream in = getClass().getResourceAsStream("/simplesched-h2.sql"); + int v; + while ((v = in.read()) != -1) { + sql.append((char) v); + } + } + + String[] cmds = sql.toString().split(";"); + for (String cmd : cmds) { + c.createStatement().executeUpdate(cmd); + } + } finally { + c.close(); + } + } + + } + + @Override + public void tearDown() throws Exception { + _factory.shutdown(); + _db.shutdown(); + + } + + public static long mod(long a, long b) { + return a % b; + } +} + Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java?rev=940263&r1=940262&r2=940263&view=diff ============================================================================== --- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java (original) +++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java Sun May 2 17:02:51 2010 @@ -24,14 +24,9 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.ode.scheduler.simple.SchedulerThread; -import org.apache.ode.scheduler.simple.Task; -import org.apache.ode.scheduler.simple.TaskRunner; - - import junit.framework.TestCase; + /** * Test of SchedulerThread. * @@ -44,6 +39,7 @@ public class SchedulerThreadTest extends List<TR> _tasks = new ArrayList<TR>(100); + @Override public void setUp() throws Exception { _st = new SchedulerThread(this); } Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=940263&r1=940262&r2=940263&view=diff ============================================================================== --- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original) +++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Sun May 2 17:02:51 2010 @@ -16,207 +16,198 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.ode.scheduler.simple; import java.util.*; -import javax.transaction.TransactionManager; - -import junit.framework.TestCase; - -import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.bpel.iapi.Scheduler.JobInfo; import org.apache.ode.bpel.iapi.Scheduler.JobProcessor; import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException; -public class SimpleSchedulerTest extends TestCase implements JobProcessor { +public class SimpleSchedulerTest extends SchedulerTestBase implements JobProcessor { - DelegateSupport _ds; - SimpleScheduler _scheduler; - ArrayList<JobInfo> _jobs; - TransactionManager _txm; - - - public void setUp() throws Exception { - _txm = new GeronimoTransactionManager(); - _ds = new DelegateSupport(); - - _scheduler = newScheduler("n1"); - _jobs = new ArrayList<JobInfo>(100); - } - - public void tearDown() throws Exception { - _scheduler.shutdown(); - } - - public void testConcurrentExec() throws Exception { - _scheduler.start(); - _txm.begin(); - String jobId; - try { - jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100)); - Thread.sleep(200); - // Make sure we don't schedule until commit. - assertEquals(0, _jobs.size()); - } finally { - _txm.commit(); - } - // Wait for the job to be execed. - Thread.sleep(100); - // Should execute job, - assertEquals(1, _jobs.size()); - - } - - public void testImmediateScheduling() throws Exception { - _scheduler.start(); - _txm.begin(); - try { - _scheduler.schedulePersistedJob(newDetail("123"), new Date()); - Thread.sleep(100); - // Make sure we don't schedule until commit. - assertEquals(0, _jobs.size()); - } finally { - _txm.commit(); - } - Thread.sleep(100); - assertEquals(1, _jobs.size()); - } - - public void testStartStop() throws Exception { - _scheduler.start(); - _txm.begin(); - try { - for (int i = 0; i < 10; ++i) - _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + (i * 100))); - } finally { - _txm.commit(); - } - Thread.sleep(100); - _scheduler.stop(); - int jobs = _jobs.size(); - assertTrue(jobs > 0); - assertTrue(jobs < 10); - Thread.sleep(200); - assertEquals(jobs, _jobs.size()); - _scheduler.start(); - Thread.sleep(1000); - assertEquals(10, _jobs.size()); - } - - public void testNearFutureScheduling() throws Exception { - // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(1000); - _scheduler.setImmediateInterval(500); - _scheduler.start(); - - _txm.begin(); - try { - _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750)); - } finally { - _txm.commit(); - } - - Thread.sleep(850); - assertEquals(1, _jobs.size()); - } - - public void testFarFutureScheduling() throws Exception { - // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(700); - _scheduler.setImmediateInterval(300); - _scheduler.start(); - - _txm.begin(); - try { - _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750)); - } finally { - _txm.commit(); - } - - Thread.sleep(850); - assertEquals(1, _jobs.size()); - } - - public void testRecovery() throws Exception { - // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(200); - _scheduler.setImmediateInterval(100); - _scheduler.setStaleInterval(50); - - _txm.begin(); - try { - _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis())); - _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110)); - _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250)); - } finally { - _txm.commit(); - } - - _scheduler = newScheduler("n3"); - _scheduler.setNearFutureInterval(200); - _scheduler.setImmediateInterval(100); - _scheduler.setStaleInterval(50); - _scheduler.start(); - Thread.sleep(400); - assertEquals(3, _jobs.size()); - } - - public void testRecoverySuppressed() throws Exception { - // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(200); - _scheduler.setImmediateInterval(100); - _scheduler.setStaleInterval(50); - - // schedule some jobs ... - _txm.begin(); - try { - _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis())); - _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150)); - _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250)); - } finally { - _txm.commit(); - } - - // but don't start the scheduler.... - - // create a second node for the scheduler. - SimpleScheduler scheduler = newScheduler("n3"); - scheduler.setNearFutureInterval(200); - scheduler.setImmediateInterval(100); - scheduler.setStaleInterval(50); - scheduler.start(); - for (int i = 0; i < 40; ++i) { - scheduler.updateHeartBeat("n1"); - Thread.sleep(10); - } - - scheduler.stop(); - - assertTrue(_jobs.size() <= 1); - if (_jobs.size() == 1) - assertEquals("far", _jobs.get(0).jobDetail.getDetailsExt().get("foo")); - } - - public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException { - synchronized (_jobs) { - _jobs.add(jobInfo); - } - } - - Scheduler.JobDetails newDetail(String x) { - Scheduler.JobDetails jd = new Scheduler.JobDetails(); - jd.getDetailsExt().put("foo", x); - return jd; - } - - private SimpleScheduler newScheduler(String nodeId) { - SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate(), new Properties()); - scheduler.setJobProcessor(this); - scheduler.setTransactionManager(_txm); - return scheduler; - } - + SimpleScheduler _scheduler; + ArrayList<JobInfo> _jobs; + + public void setUp() throws Exception { + super.setUp(); + _scheduler = newScheduler("n1"); + _jobs = new ArrayList<JobInfo>(100); + } + + public void tearDown() throws Exception { + _scheduler.shutdown(); + super.tearDown(); + } + + public void testConcurrentExec() throws Exception { + _scheduler.start(); + _txm.begin(); + String jobId; + try { + jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100)); + Thread.sleep(200); + // Make sure we don't schedule until commit. + assertEquals(0, _jobs.size()); + } finally { + _txm.commit(); + } + // Wait for the job to be execed. + Thread.sleep(100); + // Should execute job, + assertEquals(1, _jobs.size()); + + } + + public void testImmediateScheduling() throws Exception { + _scheduler.start(); + _txm.begin(); + try { + _scheduler.schedulePersistedJob(newDetail("123"), new Date()); + Thread.sleep(100); + // Make sure we don't schedule until commit. + assertEquals(0, _jobs.size()); + } finally { + _txm.commit(); + } + Thread.sleep(100); + assertEquals(1, _jobs.size()); + } + + public void testStartStop() throws Exception { + _scheduler.start(); + _txm.begin(); + try { + for (int i = 0; i < 10; ++i) { + _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + (i * 100))); + } + } finally { + _txm.commit(); + } + Thread.sleep(100); + _scheduler.stop(); + int jobs = _jobs.size(); + assertTrue(jobs > 0); + assertTrue(jobs < 10); + Thread.sleep(200); + assertEquals(jobs, _jobs.size()); + _scheduler.start(); + Thread.sleep(1000); + assertEquals(10, _jobs.size()); + } + + public void testNearFutureScheduling() throws Exception { + // speed things up a bit to hit the right code paths + _scheduler.setNearFutureInterval(1000); + _scheduler.setImmediateInterval(500); + _scheduler.start(); + + _txm.begin(); + try { + _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750)); + } finally { + _txm.commit(); + } + + Thread.sleep(850); + assertEquals(1, _jobs.size()); + } + + public void testFarFutureScheduling() throws Exception { + // speed things up a bit to hit the right code paths + _scheduler.setNearFutureInterval(700); + _scheduler.setImmediateInterval(300); + _scheduler.start(); + + _txm.begin(); + try { + _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750)); + } finally { + _txm.commit(); + } + + Thread.sleep(850); + assertEquals(1, _jobs.size()); + } + + public void testRecovery() throws Exception { + // speed things up a bit to hit the right code paths + _scheduler.setNearFutureInterval(200); + _scheduler.setImmediateInterval(100); + _scheduler.setStaleInterval(50); + + _txm.begin(); + try { + _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis())); + _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110)); + _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250)); + } finally { + _txm.commit(); + } + + _scheduler = newScheduler("n3"); + _scheduler.setNearFutureInterval(200); + _scheduler.setImmediateInterval(100); + _scheduler.setStaleInterval(50); + _scheduler.start(); + Thread.sleep(400); + assertEquals(3, _jobs.size()); + } + + public void testRecoverySuppressed() throws Exception { + // speed things up a bit to hit the right code paths + _scheduler.setNearFutureInterval(200); + _scheduler.setImmediateInterval(100); + _scheduler.setStaleInterval(50); + + // schedule some jobs ... + _txm.begin(); + try { + _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis())); + _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150)); + _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250)); + } finally { + _txm.commit(); + } + + // but don't start the scheduler.... + + // create a second node for the scheduler. + SimpleScheduler scheduler = newScheduler("n3"); + scheduler.setNearFutureInterval(200); + scheduler.setImmediateInterval(100); + scheduler.setStaleInterval(50); + scheduler.start(); + for (int i = 0; i < 40; ++i) { + scheduler.updateHeartBeat("n1"); + Thread.sleep(10); + } + + scheduler.stop(); + + assertTrue(_jobs.size() <= 1); + if (_jobs.size() == 1) { + assertEquals("far", _jobs.get(0).jobDetail.getDetailsExt().get("foo")); + } + } + + public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException { + synchronized (_jobs) { + _jobs.add(jobInfo); + } + } + + Scheduler.JobDetails newDetail(String x) { + Scheduler.JobDetails jd = new Scheduler.JobDetails(); + jd.getDetailsExt().put("foo", x); + return jd; + } + + private SimpleScheduler newScheduler(String nodeId) { + SimpleScheduler scheduler = new SimpleScheduler(nodeId, _factory, _txm, new Properties()); + scheduler.setJobProcessor(this); + return scheduler; + } } + Modified: ode/trunk/scheduler-simple/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/resources/log4j.properties?rev=940263&r1=940262&r2=940263&view=diff ============================================================================== --- ode/trunk/scheduler-simple/src/test/resources/log4j.properties (original) +++ ode/trunk/scheduler-simple/src/test/resources/log4j.properties Sun May 2 17:02:51 2010 @@ -16,9 +16,10 @@ # # Set root logger level to WARN and its only appender to CONSOLE -log4j.rootLogger=WARN, CONSOLE +log4j.rootLogger=WARN, CONSOLE, FILE # log4j properties to work with commandline tools. +log4j.category.org.apache.ode=ERROR log4j.category.org.apache.ode.scheduler.simple.RetriesTest=INFO log4j.category.org.apache.ode.bpel.engine=INFO @@ -26,3 +27,10 @@ log4j.category.org.apache.ode.bpel.engin log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%p - %C{1}.%M(%L) | %m%n + +log4j.appender.FILE=org.apache.log4j.FileAppender +log4j.appender.FILE.File=target/scheduler-test.log +log4j.appender.FILE.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.conversionpattern=%d{mm...@hh:mm:ss} %-5p (%13F:%L) %3x - %m%n +log4j.appender.FILE.append=false + Added: ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql?rev=940263&view=auto ============================================================================== --- ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql (added) +++ ode/trunk/scheduler-simple/src/test/resources/simplesched-h2.sql Sun May 2 17:02:51 2010 @@ -0,0 +1,22 @@ +CREATE TABLE ode_job ( + jobid varchar(64) NOT NULL DEFAULT '', + ts BIGINT NOT NULL DEFAULT 0, + nodeid varchar(64), + scheduled int NOT NULL DEFAULT 0, + transacted int NOT NULL DEFAULT 0, + + instanceId BIGINT, + mexId varchar(255), + processId varchar(255), + type varchar(255), + channel varchar(255), + correlatorId varchar(255), + correlationKey varchar(255), + retryCount int, + inMem int, + detailsExt blob(4096), + + PRIMARY KEY(jobid)); + +CREATE INDEX IDX_ODE_JOB_TS ON ode_job(ts); +CREATE INDEX IDX_ODE_JOB_NODEID ON ode_job(nodeid); \ No newline at end of file
