Author: rr
Date: Tue Jun 30 08:28:30 2009
New Revision: 789611
URL: http://svn.apache.org/viewvc?rev=789611&view=rev
Log:
ODE-626: Addressed suggestions #1, #3, #4 from Alexis
Removed:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
Modified:
ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Modified:
ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
(original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
Tue Jun 30 08:28:30 2009
@@ -97,30 +97,7 @@
INVOKE_CHECK
}
- public interface JobDetails {
- public Long getInstanceId();
- public void setInstanceId(Long iid);
- public String getMexId();
- public void setMexId(String mexId);
- public QName getProcessId();
- public void setProcessId(QName processId);
- public JobType getType();
- public void setType(JobType type);
- public String getChannel();
- public void setChannel(String channel);
- public String getCorrelatorId();
- public void setCorrelatorId(String correlatorId);
- public CorrelationKey getCorrelationKey();
- public void setCorrelationKey(CorrelationKey correlationKey);
- public Integer getRetryCount();
- public void setRetryCount(Integer retryCount);
- public Boolean getInMem();
- public void setInMem(Boolean inMem);
- public Map<String, Object> getDetailsExt();
- public void setDetailsExt(Map<String, Object> detailsExt);
- }
-
- public static class JobDetailsImpl implements Scheduler.JobDetails {
+ public static class JobDetails {
public Long instanceId;
public String mexId;
public String processId;
@@ -195,7 +172,7 @@
@Override
public String toString() {
- return "JobDetailsImpl("
+ return "JobDetails("
+ " instanceId: " + instanceId
+ " mexId: " + mexId
+ " processId: " + processId
Modified:
ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
(original)
+++
ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessConfImpl.java
Tue Jun 30 08:28:30 2009
@@ -50,7 +50,6 @@
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetailsImpl;
import org.apache.ode.bpel.rapi.ProcessModel;
import org.apache.ode.store.DeploymentUnitDir.CBPInfo;
import org.apache.ode.utils.CronExpression;
@@ -468,7 +467,7 @@
cleanupInfo.setFilters(aCleanup.getFilterList());
ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(),
aCleanup.getCategoryList());
- JobDetails runnableDetails = new JobDetailsImpl();
+ JobDetails runnableDetails = new JobDetails();
runnableDetails.getDetailsExt().put("cleanupInfo",
cleanupInfo);
runnableDetails.getDetailsExt().put("pid", _pid);
runnableDetails.getDetailsExt().put("transactionSize", 10);
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Tue Jun 30 08:28:30 2009
@@ -581,22 +581,22 @@
}
public void registerTimer(String timerChannelId, Date timeToFire) {
- WorkEvent we = new WorkEvent();
- we.setInstanceId(_dao.getInstanceId());
- we.setProcessId(_bpelProcess.getPID());
- we.setChannel(timerChannelId);
- we.setType(Scheduler.JobType.TIMER);
- _bpelProcess.scheduleWorkEvent(we, timeToFire);
+ JobDetails j = new JobDetails();
+ j.setInstanceId(_dao.getInstanceId());
+ j.setProcessId(_bpelProcess.getPID());
+ j.setChannel(timerChannelId);
+ j.setType(Scheduler.JobType.TIMER);
+ _bpelProcess.scheduleJob(j, timeToFire);
}
private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey
key) {
- WorkEvent we = new WorkEvent();
- we.setInstanceId(_dao.getInstanceId());
- we.setProcessId(_bpelProcess.getPID());
- we.setType(Scheduler.JobType.MATCHER);
- we.setCorrelatorId(correlatorId);
- we.setCorrelationKey(key);
- _bpelProcess.scheduleWorkEvent(we, null);
+ JobDetails j = new JobDetails();
+ j.setInstanceId(_dao.getInstanceId());
+ j.setProcessId(_bpelProcess.getPID());
+ j.setType(Scheduler.JobType.MATCHER);
+ j.setCorrelatorId(correlatorId);
+ j.setCorrelationKey(key);
+ _bpelProcess.scheduleJob(j, null);
}
public String invoke(String requestId, PartnerLink partnerLink, Operation
operation, Element outgoingMessage)
@@ -774,12 +774,12 @@
__log.debug("MaxTime exceeded for instance # " + _iid);
try {
- WorkEvent we = new WorkEvent();
- we.setInstanceId(_iid);
- we.setRetryCount(_retryCount);
- we.setProcessId(_bpelProcess.getPID());
- we.setType(Scheduler.JobType.RESUME);
- _contexts.scheduler.schedulePersistedJob(we.getDetails(),
new Date());
+ JobDetails j = new JobDetails();
+ j.setInstanceId(_iid);
+ j.setRetryCount(_retryCount);
+ j.setProcessId(_bpelProcess.getPID());
+ j.setType(Scheduler.JobType.RESUME);
+ _contexts.scheduler.schedulePersistedJob(j, new Date());
} catch (ContextException e) {
__log.error("Failed to schedule resume task.", e);
throw new BpelEngineException(e);
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
(original)
+++
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
Tue Jun 30 08:28:30 2009
@@ -46,6 +46,7 @@
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
@@ -522,8 +523,8 @@
public void onScheduledJob(final JobInfo jobInfo) throws
JobProcessorException {
_mngmtLock.readLock().lock();
try {
- final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
- ODEProcess process = _registeredProcesses.get(we.getProcessId());
+ final JobDetails j = jobInfo.jobDetail;
+ ODEProcess process = _registeredProcesses.get(j.getProcessId());
if (process == null) {
// If the process is not active, it means that we should not be
// doing any work on its behalf, therefore we will reschedule
the
@@ -532,8 +533,8 @@
public Void call() throws Exception {
_contexts.scheduler.jobCompleted(jobInfo.jobName);
Date future = new Date(System.currentTimeMillis() +
(60 * 1000));
-
__log.debug(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(),
jobInfo.jobName, future));
-
_contexts.scheduler.schedulePersistedJob(we.getDetails(), future);
+
__log.debug(__msgs.msgReschedulingJobForInactiveProcess(j.getProcessId(),
jobInfo.jobName, future));
+ _contexts.scheduler.schedulePersistedJob(j, future);
return null;
}
@@ -541,12 +542,12 @@
return;
}
- if (we.getType().equals(Scheduler.JobType.INVOKE_CHECK)) {
- if (__log.isDebugEnabled()) __log.debug("handleWorkEvent:
InvokeCheck event for mexid " + we.getMexId());
+ if (j.getType().equals(Scheduler.JobType.INVOKE_CHECK)) {
+ if (__log.isDebugEnabled()) __log.debug("handleWorkEvent:
InvokeCheck event for mexid " + j.getMexId());
- PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange)
getMessageExchange(we.getMexId());
+ PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange)
getMessageExchange(j.getMexId());
if (mex.getStatus() == MessageExchange.Status.ASYNC ||
mex.getStatus() == MessageExchange.Status.ACK) {
- String msg = "No response received for invoke (mexId=" +
we.getMexId() + "), forcing it into a failed state.";
+ String msg = "No response received for invoke (mexId=" +
j.getMexId() + "), forcing it into a failed state.";
if (__log.isDebugEnabled()) __log.debug(msg);
mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, msg,
null);
}
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
(original)
+++
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
Tue Jun 30 08:28:30 2009
@@ -172,11 +172,11 @@
__log.debug("step(" + iid + ") adding step indicator
to table.");
_step.add(iid);
- WorkEvent we = new WorkEvent();
- we.setInstanceId(iid);
- we.setProcessId(_process.getPID());
- we.setType(Scheduler.JobType.RESUME);
-
_process._contexts.scheduler.schedulePersistedJob(we.getDetails(), null);
+ JobDetails j = new JobDetails();
+ j.setInstanceId(iid);
+ j.setProcessId(_process.getPID());
+ j.setType(Scheduler.JobType.RESUME);
+ _process._contexts.scheduler.schedulePersistedJob(j,
null);
return true;
}
@@ -296,11 +296,11 @@
_process.saveEvent(evt, instance);
onEvent(evt);
- WorkEvent we = new WorkEvent();
- we.setType(JobType.RESUME);
- we.setProcessId(_process.getPID());
- we.setInstanceId(iid);
-
_process._contexts.scheduler.schedulePersistedJob(we.getDetails(), null);
+ JobDetails j = new JobDetails();
+ j.setType(JobType.RESUME);
+ j.setProcessId(_process.getPID());
+ j.setInstanceId(iid);
+ _process._contexts.scheduler.schedulePersistedJob(j,
null);
return true;
}
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
(original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
Tue Jun 30 08:28:30 2009
@@ -42,6 +42,7 @@
import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.intercept.FailMessageExchangeException;
import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -350,15 +351,15 @@
markused();
- final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+ final JobDetails j = jobInfo.jobDetail;
if (__log.isDebugEnabled()) {
__log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent",
new Object[] { "jobInfo", jobInfo }));
}
- enqueueInstanceTransaction(we.getInstanceId(), new Runnable() {
+ enqueueInstanceTransaction(j.getInstanceId(), new Runnable() {
public void run() {
_contexts.scheduler.jobCompleted(jobInfo.jobName);
- execInstanceEvent(we);
+ execInstanceEvent(j);
}
});
@@ -401,41 +402,41 @@
return state;
}
- private void execInstanceEvent(WorkEvent we) {
- BpelInstanceWorker worker =
_instanceWorkerCache.get(we.getInstanceId());
+ private void execInstanceEvent(JobDetails j) {
+ BpelInstanceWorker worker =
_instanceWorkerCache.get(j.getInstanceId());
assert worker.isWorkerThread();
- ProcessInstanceDAO instanceDAO =
getProcessDAO().getInstance(we.getInstanceId());
- MessageExchangeDAO mexDao = we.getMexId() == null ? null :
loadMexDao(we.getMexId());
+ ProcessInstanceDAO instanceDAO =
getProcessDAO().getInstance(j.getInstanceId());
+ MessageExchangeDAO mexDao = j.getMexId() == null ? null :
loadMexDao(j.getMexId());
if (instanceDAO == null) {
if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: no ProcessInstance found with
iid " + we.getInstanceId() + "; ignoring.");
+ __log.debug("handleWorkEvent: no ProcessInstance found with
iid " + j.getInstanceId() + "; ignoring.");
}
return;
}
if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: " + we.getType() + " event for
process instance " + we.getInstanceId());
+ __log.debug("handleWorkEvent: " + j.getType() + " event for
process instance " + j.getInstanceId());
}
- switch (we.getType()) {
+ switch (j.getType()) {
case MYROLE_INVOKE:
executeContinueInstanceMyRoleRequestReceived(mexDao);
if(__log.isDebugEnabled()) __log.debug("handleWorkEvent: releasing
myrole mex dao: " + mexDao);
mexDao.release(true);
break;
case TIMER:
- executeContinueInstanceTimerReceived(instanceDAO, we.getChannel());
+ executeContinueInstanceTimerReceived(instanceDAO, j.getChannel());
break;
case RESUME:
- executeContinueInstanceResume(instanceDAO, we.getRetryCount());
+ executeContinueInstanceResume(instanceDAO, j.getRetryCount());
break;
case PARTNER_RESPONSE:
executeContinueInstancePartnerRoleResponseReceived(mexDao);
break;
case MATCHER:
- executeContinueInstanceMatcherEvent(instanceDAO,
we.getCorrelatorId(), we.getCorrelationKey());
+ executeContinueInstanceMatcherEvent(instanceDAO,
j.getCorrelatorId(), j.getCorrelationKey());
break;
}
}
@@ -761,11 +762,11 @@
}
}
- public String scheduleWorkEvent(WorkEvent we, Date timeToFire) {
+ public String scheduleJob(JobDetails jd, Date timeToFire) {
// if (isInMemory())
// throw new InvalidProcessException("In-mem process execution
resulted in event scheduling.");
- return _contexts.scheduler.schedulePersistedJob(we.getDetails(),
timeToFire);
+ return _contexts.scheduler.schedulePersistedJob(jd, timeToFire);
}
protected OdeRuntime buildRuntime(int modelVersion) {
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
(original)
+++
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODERESTProcess.java
Tue Jun 30 08:28:30 2009
@@ -1,6 +1,7 @@
package org.apache.ode.bpel.engine;
import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.rapi.ResourceModel;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -135,14 +136,14 @@
}
});
} else /* non-transacted style */ {
- WorkEvent we = new WorkEvent();
- we.setType(Scheduler.JobType.MYROLE_INVOKE);
- we.setInstanceId(mexdao.getInstance().getInstanceId());
- we.setMexId(mexdao.getMessageExchangeId());
+ JobDetails j = new JobDetails();
+ j.setType(Scheduler.JobType.MYROLE_INVOKE);
+ j.setInstanceId(mexdao.getInstance().getInstanceId());
+ j.setMexId(mexdao.getMessageExchangeId());
// Could be different to this pid when routing to an older
version
-
we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+
j.setProcessId(mexdao.getInstance().getProcess().getProcessId());
- scheduleWorkEvent(we, null);
+ scheduleJob(j, null);
}
// Cleaning up
_contexts.dao.getConnection().deleteResourceRoute(urlMeth[0],
urlMeth[1]);
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
(original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ODEWSProcess.java
Tue Jun 30 08:28:30 2009
@@ -5,6 +5,7 @@
import org.apache.ode.bpel.rapi.ConstantsModel;
import org.apache.ode.bpel.rapi.InvalidProcessException;
import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.MessageDAO;
@@ -408,14 +409,14 @@
} else if (istyle == InvocationStyle.P2P_TRANSACTED) /*
transact p2p invoke in the same thread */ {
executeContinueInstanceMyRoleRequestReceived(mexdao);
} else /* non-transacted style */{
- WorkEvent we = new WorkEvent();
- we.setType(JobType.MYROLE_INVOKE);
- we.setInstanceId(mexdao.getInstance().getInstanceId());
- we.setMexId(mexdao.getMessageExchangeId());
+ JobDetails j = new JobDetails();
+ j.setType(JobType.MYROLE_INVOKE);
+ j.setInstanceId(mexdao.getInstance().getInstanceId());
+ j.setMexId(mexdao.getMessageExchangeId());
// Could be different to this pid when routing to an older
version
-
we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
+
j.setProcessId(mexdao.getInstance().getProcess().getProcessId());
- scheduleWorkEvent(we, null);
+ scheduleJob(j, null);
}
} else if (cstatus ==
MyRoleMessageExchange.CorrelationStatus.QUEUED) {
; // do nothing
@@ -671,15 +672,15 @@
org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
if (!isInMemory() && isTwoWay) {
if (__log.isDebugEnabled()) __log.debug("Creating invocation check
event for mexid " + mex.getMessageExchangeId());
- WorkEvent event = new WorkEvent();
- event.setMexId(mex.getMessageExchangeId());
- event.setProcessId(getPID());
- event.setType(JobType.INVOKE_CHECK);
+ JobDetails job = new JobDetails();
+ job.setMexId(mex.getMessageExchangeId());
+ job.setProcessId(getPID());
+ job.setType(JobType.INVOKE_CHECK);
// use a greater timeout to make sure the check job does not get
executed while the service invocation is still waiting for a response
PartnerLinkModel model =
_processModel.getPartnerLink(mex.getPartnerLinkModelId());
long timeout = (long) (getTimeout(model)*1.5);
Date future = new Date(System.currentTimeMillis() + timeout);
- String jobId = scheduleWorkEvent(event, future);
+ String jobId = scheduleJob(job, future);
mex.setProperty("invokeCheckJobId", jobId);
}
}
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
(original)
+++
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
Tue Jun 30 08:28:30 2009
@@ -7,6 +7,7 @@
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.rapi.PartnerLinkModel;
@@ -48,9 +49,9 @@
assert !_process.isInMemory() : "resumeInstance() for reliable in-mem
processes makes no sense.";
MessageExchangeDAO mexdao = getDAO();
- final WorkEvent we = generatePartnerResponseWorkEvent(mexdao);
+ final JobDetails j = generatePartnerResponseJob(mexdao);
save(mexdao);
- _contexts.scheduler.schedulePersistedJob(we.getDetails(), null);
+ _contexts.scheduler.schedulePersistedJob(j, null);
}
@@ -59,14 +60,14 @@
return InvocationStyle.RELIABLE;
}
- private WorkEvent generatePartnerResponseWorkEvent(MessageExchangeDAO
mexdao) {
- WorkEvent we = new WorkEvent();
- we.setProcessId(_process.getPID());
- we.setChannel(mexdao.getChannel());
- we.setInstanceId(_iid);
- we.setMexId(_mexId);
- we.setType(JobType.PARTNER_RESPONSE);
- return we;
+ private JobDetails generatePartnerResponseJob(MessageExchangeDAO mexdao) {
+ JobDetails j = new JobDetails();
+ j.setProcessId(_process.getPID());
+ j.setChannel(mexdao.getChannel());
+ j.setInstanceId(_iid);
+ j.setMexId(_mexId);
+ j.setType(JobType.PARTNER_RESPONSE);
+ return j;
}
}
Modified:
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
URL:
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
(original)
+++
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
Tue Jun 30 08:28:30 2009
@@ -16,7 +16,6 @@
import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetailsImpl;
import org.apache.ode.store.ProcessCleanupConfImpl;
import org.apache.ode.utils.CronExpression;
import org.apache.xmlbeans.XmlOptions;
@@ -66,7 +65,7 @@
cleanupInfo.setFilters(aCleanup.getFilterList());
ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(),
aCleanup.getCategoryList());
- JobDetails runnableDetails = new JobDetailsImpl();
+ JobDetails runnableDetails = new JobDetails();
runnableDetails.getDetailsExt().put("cleanupInfo",
cleanupInfo);
runnableDetails.getDetailsExt().put("transactionSize",
10);
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
(original)
+++
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
Tue Jun 30 08:28:30 2009
@@ -34,9 +34,10 @@
import java.util.Map;
import javax.sql.DataSource;
+import javax.xml.namespace.QName;
import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobDetailsImpl;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.utils.DbIsolation;
import org.apache.commons.logging.Log;
@@ -188,7 +189,7 @@
ps.setInt(i++, asInteger(loaded));
ps.setInt(i++, asInteger(job.transacted));
- JobDetailsImpl details = (JobDetailsImpl) job.detail;
+ JobDetails details = job.detail;
ps.setObject(i++, details.instanceId, Types.BIGINT);
ps.setObject(i++, details.mexId, Types.VARCHAR);
ps.setObject(i++, details.processId, Types.VARCHAR);
@@ -235,7 +236,7 @@
ResultSet rs = ps.executeQuery();
while (rs.next()) {
- Scheduler.JobDetailsImpl details = new
Scheduler.JobDetailsImpl();
+ Scheduler.JobDetails details = new Scheduler.JobDetails();
details.instanceId = (Long) rs.getObject("instanceId");
details.mexId = (String) rs.getObject("mexId");
details.processId = (String) rs.getObject("processId");
@@ -255,6 +256,41 @@
}
}
+ {
+ //For compatibility reasons, we check whether there are
entries inside
+ //jobDetailsExt blob, which correspond to extracted
entries. If so, we
+ //use them.
+
+ Map<String, Object> detailsExt = details.getDetailsExt();
+ if (detailsExt.get("type") != null) {
+ details.type = (String) detailsExt.get("type");
+ }
+ if (detailsExt.get("iid") != null) {
+ details.instanceId = (Long) detailsExt.get("iid");
+ }
+ if (detailsExt.get("pid") != null) {
+ details.processId = (String) detailsExt.get("pid");
+ }
+ if (detailsExt.get("inmem") != null) {
+ details.inMem = (Boolean) detailsExt.get("inmem");
+ }
+ if (detailsExt.get("ckey") != null) {
+ details.correlationKey = (String)
detailsExt.get("ckey");
+ }
+ if (detailsExt.get("channel") != null) {
+ details.channel = (String) detailsExt.get("channel");
+ }
+ if (detailsExt.get("mexid") != null) {
+ details.mexId = (String) detailsExt.get("mexid");
+ }
+ if (detailsExt.get("correlatorId") != null) {
+ details.correlatorId = (String)
detailsExt.get("correlatorId");
+ }
+ if (detailsExt.get("retryCount") != null) {
+ details.retryCount = Integer.parseInt((String)
detailsExt.get("retryCount"));
+ }
+ }
+
Job job = new Job(rs.getLong(2), rs.getString(1),
asBoolean(rs.getInt(3)), details);
ret.add(job);
}
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
(original)
+++
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Tue Jun 30 08:28:30 2009
@@ -192,7 +192,7 @@
if (when == null)
when = new Date(ctime);
- JobDetails jobDetails = new JobDetailsImpl();
+ JobDetails jobDetails = new JobDetails();
jobDetails.getDetailsExt().put("runnable", runnable);
runnable.storeToDetails(jobDetails);
Modified:
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
(original)
+++
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
Tue Jun 30 08:28:30 2009
@@ -22,7 +22,10 @@
import java.util.HashMap;
import java.util.List;
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.scheduler.simple.DatabaseDelegate;
import org.apache.ode.scheduler.simple.Job;
@@ -53,19 +56,19 @@
assertEquals(0, nids.size());
// try for one nodeid
- _del.insertJob(new Job(0L,true,new Scheduler.JobDetailsImpl()), "abc",
true);
+ _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc",
true);
nids = _del.getNodeIds();
assertEquals(1, nids.size());
assertTrue(nids.contains("abc"));
// check that dups are ignored.
- _del.insertJob(new Job(0L,true,new Scheduler.JobDetailsImpl()), "abc",
true);
+ _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "abc",
true);
nids = _del.getNodeIds();
assertEquals(1, nids.size());
assertTrue(nids.contains("abc"));
// add another nodeid,
- _del.insertJob(new Job(0L,true,new Scheduler.JobDetailsImpl()), "123",
true);
+ _del.insertJob(new Job(0L,true,new Scheduler.JobDetails()), "123",
true);
nids = _del.getNodeIds();
assertEquals(2, nids.size());
assertTrue(nids.contains("abc"));
@@ -73,8 +76,8 @@
}
public void testReassign() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()),
"n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()),
"n2", false);
+ _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()),
"n1", false);
+ _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()),
"n2", false);
assertEquals(1,_del.updateReassign("n1","n2"));
List<Job> jobs = _del.dequeueImmediate("n2", 400L, 1000);
@@ -82,8 +85,8 @@
}
public void testScheduleImmediateTimeFilter() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()),
"n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()),
"n1", false);
+ _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()),
"n1", false);
+ _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()),
"n1", false);
List<Job> jobs = _del.dequeueImmediate("n1", 150L, 1000);
@@ -97,8 +100,8 @@
}
public void testScheduleImmediateMaxRows() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()),
"n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()),
"n1", false);
+ _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()),
"n1", false);
+ _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()),
"n1", false);
List<Job> jobs = _del.dequeueImmediate("n1", 201L, 1);
assertNotNull(jobs);
@@ -111,8 +114,8 @@
}
public void testScheduleImmediateNodeFilter() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()),
"n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()),
"n2", false);
+ _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()),
"n1", false);
+ _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()),
"n2", false);
List<Job> jobs = _del.dequeueImmediate("n2", 300L, 1000);
assertNotNull(jobs);
@@ -121,8 +124,8 @@
}
public void testDeleteJob() throws Exception {
- _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetailsImpl()),
"n1", false);
- _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetailsImpl()),
"n2", false);
+ _del.insertJob(new Job(100L,"j1",true,new Scheduler.JobDetails()),
"n1", false);
+ _del.insertJob(new Job(200L,"j2",true,new Scheduler.JobDetails()),
"n2", false);
// try deleting, wrong jobid -- del should fail
assertFalse(_del.deleteJob("j1x", "n1"));
@@ -139,7 +142,7 @@
public void testUpgrade() throws Exception {
for (int i = 0; i < 200; ++i)
- _del.insertJob(new Job(i ,"j" +i,true,new
Scheduler.JobDetailsImpl()), null, false);
+ _del.insertJob(new Job(i ,"j" +i,true,new Scheduler.JobDetails()),
null, false);
int n1 = _del.updateAssignToNode("n1", 0, 3, 100);
int n2 = _del.updateAssignToNode("n2", 1, 3, 100);
@@ -153,4 +156,30 @@
assertEquals(n3,_del.dequeueImmediate("n3", 10000L, 1000).size());
}
+ public void testMigration() throws Exception {
+ Scheduler.JobDetails j1 = new Scheduler.JobDetails();
+ j1.getDetailsExt().put("type", "MATCHER");
+ j1.getDetailsExt().put("iid", 1234L);
+ j1.getDetailsExt().put("pid", new QName("http://test1",
"test2").toString());
+ j1.getDetailsExt().put("inmem", true);
+ j1.getDetailsExt().put("ckey", "123~abcd");
+ j1.getDetailsExt().put("channel", "123");
+ j1.getDetailsExt().put("mexid", "mexid123");
+ j1.getDetailsExt().put("correlatorId", "cid123");
+ j1.getDetailsExt().put("retryCount", "15");
+
+ _del.insertJob(new Job(0 ,"migration",true,j1), null, false);
+ _del.updateAssignToNode("m", 0, 3, 100);
+ Scheduler.JobDetails j2 = _del.dequeueImmediate("m", 10000L,
1000).get(0).detail;
+
+ assertEquals(j2.getType(), JobType.MATCHER);
+ assertEquals(j2.getInstanceId(), (Object) 1234L);
+ assertEquals(j2.getProcessId(), new QName("http://test1", "test2"));
+ assertEquals(j2.getInMem(), (Object) true);
+ assertEquals(j2.getCorrelationKey().toCanonicalString(), (Object)
"123~abcd");
+ assertEquals(j2.getChannel(), (Object) "123");
+ assertEquals(j2.getMexId(), (Object) "mexid123");
+ assertEquals(j2.getCorrelatorId(), (Object) "cid123");
+ assertEquals(j2.getRetryCount(), (Object) 15);
+ }
}
Modified:
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
(original)
+++
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
Tue Jun 30 08:28:30 2009
@@ -65,7 +65,7 @@
}
Scheduler.JobDetails newDetail(String x) {
- Scheduler.JobDetails jd = new Scheduler.JobDetailsImpl();
+ Scheduler.JobDetails jd = new Scheduler.JobDetails();
jd.getDetailsExt().put("foo", x);
return jd;
}
Modified:
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=789611&r1=789610&r2=789611&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
(original)
+++
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Tue Jun 30 08:28:30 2009
@@ -207,7 +207,7 @@
}
Scheduler.JobDetails newDetail(String x) {
- Scheduler.JobDetails jd = new Scheduler.JobDetailsImpl();
+ Scheduler.JobDetails jd = new Scheduler.JobDetails();
jd.getDetailsExt().put("foo", x);
return jd;
}