Modified: ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java URL: http://svn.apache.org/viewvc/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?view=diff&rev=564361&r1=564360&r2=564361 ============================================================================== --- ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java (original) +++ ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java Thu Aug 9 12:55:19 2007 @@ -25,7 +25,10 @@ import org.apache.ode.bpel.dao.PartnerLinkDAO; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; +import org.apache.ode.bpel.iapi.InvocationStyle; import org.apache.ode.bpel.iapi.MessageExchange.AckType; +import org.apache.ode.bpel.iapi.MessageExchange.FailureType; +import org.apache.ode.bpel.iapi.MessageExchange.Status; import org.apache.ode.utils.DOMUtils; import org.apache.ode.utils.uuid.UUID; import org.w3c.dom.Element; @@ -112,18 +115,17 @@ @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="CORR_ID") private CorrelatorDAOImpl _correlator; - @Basic @Column(name="ISTYLE") private String _istyle; - @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="P2P_PIPE_PEER") - private MessageExchangeDAO _pipedMex; - @Basic @Column(name="TIMEOUT") private long _timeout; @Basic @Column(name="FAILURE_TYPE") private String _failureType; + + @Basic @Column(name="PIPED_PID") + private String _pipedPid; public MessageExchangeDAOImpl() {} @@ -240,8 +242,8 @@ return _response; } - public String getStatus() { - return _status; + public Status getStatus() { + return _status == null ? null : Status.valueOf(_status); } public void setCallee(QName callee) { @@ -313,8 +315,8 @@ _response = (MessageDAOImpl)msg; } - public void setStatus(String status) { - _status = status; + public void setStatus(Status status) { + _status = status == null ? null : status.toString(); } public String getPipedMessageExchangeId() { @@ -357,32 +359,25 @@ _correlator = correlator; } - public String getInvocationStyle() { - return _istyle; + public InvocationStyle getInvocationStyle() { + return _istyle == null ? null : InvocationStyle.valueOf(_istyle); } - public MessageExchangeDAO getPipedMessageExchange() { - return _pipedMex; - } public long getTimeout() { return _timeout; } - public void setFailureType(String failureType) { - _failureType = failureType; + public void setFailureType(FailureType failureType) { + _failureType = failureType == null ? null :failureType.toString(); } - public String getFailureType() { - return _failureType; - } - - public void setInvocationStyle(String invocationStyle) { - _istyle = invocationStyle; + public FailureType getFailureType() { + return _failureType == null ? null : FailureType.valueOf(_failureType); } - public void setPipedMessageExchange(MessageExchangeDAO mex) { - _pipedMex = mex; + public void setInvocationStyle(InvocationStyle invocationStyle) { + _istyle = invocationStyle == null ? null : invocationStyle.toString(); } public void setTimeout(long timeout) { @@ -395,5 +390,13 @@ public void setAckType(AckType ackType) { _ackType = ackType == null ? null :ackType.toString(); + } + + public QName getPipedPID() { + return _pipedPid == null ? null : QName.valueOf(_pipedPid); + } + + public void setPipedPID(QName pipedPid) { + _pipedPid = pipedPid == null ? null : pipedPid.toString(); } }
Modified: ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java URL: http://svn.apache.org/viewvc/ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?view=diff&rev=564361&r1=564360&r2=564361 ============================================================================== --- ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java (original) +++ ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java Thu Aug 9 12:55:19 2007 @@ -43,7 +43,7 @@ _ds = new DelegateSupport(); _del = _ds.delegate(); } - + public void testGetNodeIds() throws Exception { // should have no node ids in the db, empty list (not null) Modified: ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java URL: http://svn.apache.org/viewvc/ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?view=diff&rev=564361&r1=564360&r2=564361 ============================================================================== --- ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original) +++ ode/branches/bart/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Thu Aug 9 12:55:19 2007 @@ -24,25 +24,20 @@ import java.util.HashMap; import java.util.Map; -import javax.transaction.RollbackException; -import javax.transaction.Status; -import javax.transaction.Synchronization; -import javax.transaction.SystemException; import javax.transaction.TransactionManager; import junit.framework.TestCase; +import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; import org.apache.ode.bpel.iapi.Scheduler.JobInfo; import org.apache.ode.bpel.iapi.Scheduler.JobProcessor; import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException; -import org.apache.geronimo.transaction.manager.GeronimoTransactionManager; public class SimpleSchedulerTest extends TestCase implements JobProcessor { DelegateSupport _ds; SimpleScheduler _scheduler; ArrayList<JobInfo> _jobs; - ArrayList<JobInfo> _commit; TransactionManager _txm; @@ -52,7 +47,6 @@ _scheduler = newScheduler("n1"); _jobs = new ArrayList<JobInfo>(100); - _commit = new ArrayList<JobInfo>(100); } public void tearDown() throws Exception { @@ -64,21 +58,18 @@ _txm.begin(); String jobId; try { - jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 200)); - Thread.sleep(100); + jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100)); + Thread.sleep(200); // Make sure we don't schedule until commit. assertEquals(0, _jobs.size()); } finally { _txm.commit(); } - // Delete from DB - assertEquals(true,_ds.delegate().deleteJob(jobId, "n1")); // Wait for the job to be execed. - Thread.sleep(250); + Thread.sleep(100); // Should execute job, assertEquals(1, _jobs.size()); - // But should not commit. - assertEquals(0, _commit.size()); + } public void testImmediateScheduling() throws Exception { @@ -119,123 +110,102 @@ public void testNearFutureScheduling() throws Exception { // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(10000); - _scheduler.setImmediateInterval(5000); + _scheduler.setNearFutureInterval(1000); + _scheduler.setImmediateInterval(500); _scheduler.start(); _txm.begin(); try { - _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500)); + _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750)); } finally { _txm.commit(); } - Thread.sleep(8500); + Thread.sleep(850); assertEquals(1, _jobs.size()); } public void testFarFutureScheduling() throws Exception { // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(7000); - _scheduler.setImmediateInterval(3000); + _scheduler.setNearFutureInterval(700); + _scheduler.setImmediateInterval(300); _scheduler.start(); _txm.begin(); try { - _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500)); + _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750)); } finally { _txm.commit(); } - Thread.sleep(8500); + Thread.sleep(850); assertEquals(1, _jobs.size()); } public void testRecovery() throws Exception { // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(2000); - _scheduler.setImmediateInterval(1000); - _scheduler.setStaleInterval(500); + _scheduler.setNearFutureInterval(200); + _scheduler.setImmediateInterval(100); + _scheduler.setStaleInterval(50); _txm.begin(); try { _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis())); - _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100)); - _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 2500)); + _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110)); + _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250)); } finally { _txm.commit(); } _scheduler = newScheduler("n3"); - _scheduler.setNearFutureInterval(2000); - _scheduler.setImmediateInterval(1000); - _scheduler.setStaleInterval(1000); + _scheduler.setNearFutureInterval(200); + _scheduler.setImmediateInterval(100); + _scheduler.setStaleInterval(50); _scheduler.start(); - Thread.sleep(4000); + Thread.sleep(400); assertEquals(3, _jobs.size()); } public void testRecoverySuppressed() throws Exception { // speed things up a bit to hit the right code paths - _scheduler.setNearFutureInterval(2000); - _scheduler.setImmediateInterval(1000); - _scheduler.setStaleInterval(500); + _scheduler.setNearFutureInterval(200); + _scheduler.setImmediateInterval(100); + _scheduler.setStaleInterval(50); + // schedule some jobs ... _txm.begin(); try { _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis())); - _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100)); - _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 2500)); + _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150)); + _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250)); } finally { _txm.commit(); - } + } - _scheduler = newScheduler("n3"); - _scheduler.setNearFutureInterval(2000); - _scheduler.setImmediateInterval(1000); - _scheduler.setStaleInterval(1000); - _scheduler.start(); + // but don't start the scheduler.... + + // create a second node for the scheduler. + SimpleScheduler scheduler = newScheduler("n3"); + scheduler.setNearFutureInterval(200); + scheduler.setImmediateInterval(100); + scheduler.setStaleInterval(50); + scheduler.start(); for (int i = 0; i < 40; ++i) { - _scheduler.updateHeartBeat("n1"); - Thread.sleep(100); + scheduler.updateHeartBeat("n1"); + Thread.sleep(10); } - _scheduler.stop(); - Thread.sleep(1000); + scheduler.stop(); - assertEquals(0, _jobs.size()); + assertTrue(_jobs.size() <= 1); + if (_jobs.size() == 1) + assertEquals("far", _jobs.get(0).jobDetail.get("foo")); } public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException { synchronized (_jobs) { _jobs.add(jobInfo); } - - try { - _txm.getTransaction().registerSynchronization(new Synchronization() { - - public void afterCompletion(int arg0) { - if (arg0 == Status.STATUS_COMMITTED) - _commit.add(jobInfo); - } - - public void beforeCompletion() { - // TODO Auto-generated method stub - - } - - }); - } catch (IllegalStateException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (RollbackException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (SystemException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } Map<String, Object> newDetail(String x) {
