Author: rr
Date: Wed Jun 30 22:33:12 2010
New Revision: 959463
URL: http://svn.apache.org/viewvc?rev=959463&view=rev
Log:
Second matcher + logging fix for job retries
Added:
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
Modified:
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL:
http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
(original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Wed Jun
30 22:33:12 2010
@@ -25,7 +25,9 @@ import java.util.Map;
import javax.transaction.TransactionManager;
import javax.wsdl.Definition;
+import javax.wsdl.Operation;
import javax.wsdl.Port;
+import javax.wsdl.PortType;
import javax.wsdl.Service;
import javax.wsdl.extensions.UnknownExtensibilityElement;
import javax.wsdl.extensions.http.HTTPAddress;
@@ -112,7 +114,9 @@ public class ODEService {
String messageId = new GUID().toString();
odeMex = _server.getEngine().createMessageExchange("" + messageId,
_serviceName,
msgContext.getAxisOperation().getName().getLocalPart());
- __log.debug("ODE routed to operation " + odeMex.getOperation() + "
from service " + _serviceName);
+ __log.debug("ODE routed to portType " + odeMex.getPortType() + "
operation " + odeMex.getOperation() + " from service " + _serviceName);
+ PortType portType = odeMex.getPortType();
+ Operation operation = odeMex.getOperation();
odeMex.setProperty("isTwoWay",
Boolean.toString(msgContext.getAxisOperation() instanceof
TwoChannelAxisOperation));
if (odeMex.getOperation() != null) {
// Preparing message to send to ODE
Modified:
ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
(original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
Wed Jun 30 22:33:12 2010
@@ -176,7 +176,8 @@ public interface Scheduler {
INVOKE_INTERNAL,
INVOKE_RESPONSE,
MATCHER,
- INVOKE_CHECK
+ INVOKE_CHECK,
+ MEX_MATCHER
}
public static class JobDetails {
Modified:
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
(original)
+++
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
Wed Jun 30 22:33:12 2010
@@ -252,4 +252,6 @@ public interface MessageExchangeDAO {
* Deletes messages that arrived before the route is setup
*/
void releasePremieMessages();
+
+ boolean lockPremieMessages();
}
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Wed Jun 30 22:33:12 2010
@@ -247,12 +247,15 @@ public class BpelEngineImpl implements B
case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
mex = new MyRoleMessageExchangeImpl(process, this, mexdao);
if (process != null) {
- OPartnerLink plink = (OPartnerLink)
process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
- // the partner link might not be hydrated
- if (plink != null) {
- PortType ptype = plink.myRolePortType;
- Operation op =
plink.getMyRoleOperation(mexdao.getOperation());
- mex.setPortOp(ptype, op);
+ Object child =
process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
+ if (child instanceof OPartnerLink) {
+ OPartnerLink plink = (OPartnerLink) child;
+ // the partner link might not be hydrated
+ if (plink != null) {
+ PortType ptype = plink.myRolePortType;
+ Operation op =
plink.getMyRoleOperation(mexdao.getOperation());
+ mex.setPortOp(ptype, op);
+ }
}
}
break;
@@ -473,14 +476,14 @@ public class BpelEngineImpl implements B
}
}
}
- if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+ if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() ==
JobType.MEX_MATCHER) {
List<BpelProcess> processes =
getAllProcesses(we.getProcessId());
boolean routed = false;
jobInfo.jobDetail.detailsExt.put("enqueue", false);
for(BpelProcess proc : processes) {
routed = routed ||
proc.handleJobDetails(jobInfo.jobDetail);
}
- if(!routed) {
+ if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
jobInfo.jobDetail.detailsExt.put("enqueue", true);
process.handleJobDetails(jobInfo.jobDetail);
}
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
Wed Jun 30 22:33:12 2010
@@ -414,7 +414,7 @@ public class BpelProcess {
* @see
org.apache.ode.bpel.engine.BpelProcess#handleJobDetails(java.util.Map<java.lang.String,java.lang.Object>)
*/
public boolean handleJobDetails(JobDetails jobData) {
- boolean ret = true;
+ boolean routed = true;
try {
_hydrationLatch.latch(1);
markused();
@@ -425,12 +425,20 @@ public class BpelProcess {
JobDetails we = jobData;
// Process level events
- if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+ if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() ==
JobType.MEX_MATCHER) {
if (__log.isDebugEnabled()) {
- __log.debug("InvokeInternal event for mexid " +
we.getMexId());
+ __log.debug(we.getType() + " event for mexid " +
we.getMexId());
}
MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl)
_engine.getMessageExchange(we.getMexId());
- ret = invokeProcess(mex, (Boolean)
jobData.detailsExt.get("enqueue"));
+ if (we.getType() == JobType.MEX_MATCHER &&
!mex.getDAO().lockPremieMessages()) {
+ //Skip if already processed
+ return true;
+ }
+
+ routed = invokeProcess(mex, (Boolean)
jobData.detailsExt.get("enqueue"));
+ if (we.getType() == JobType.MEX_MATCHER && routed) {
+ mex.getDAO().releasePremieMessages();
+ }
} else {
// Instance level events
ProcessInstanceDAO procInstance =
getProcessDAO().getInstance(we.getInstanceId());
@@ -477,7 +485,7 @@ public class BpelProcess {
} finally {
_hydrationLatch.release(1);
}
- return ret;
+ return routed;
}
private void setRoles(OProcess oprocess) {
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
Wed Jun 30 22:33:12 2010
@@ -45,6 +45,8 @@ import org.apache.ode.bpel.iapi.Endpoint
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.o.OMessageVarType;
import org.apache.ode.bpel.o.OPartnerLink;
@@ -274,6 +276,18 @@ public class PartnerLinkMyRoleImpl exten
// No match, means we add message exchange to the queue.
routing.correlator.enqueueMessage(mex.getDAO(),
routing.wholeKeySet);
+
+ // Second matcher needs to be registered here
+ JobDetails we = new JobDetails();
+ we.setType(JobType.MEX_MATCHER);
+ we.setProcessId(_process.getPID());
+ we.setMexId(mex.getMessageExchangeId());
+ we.setInMem(_process.isInMemory());
+ if(_process.isInMemory()){
+
_process._engine._contexts.scheduler.scheduleVolatileJob(true, we);
+ }else{
+
_process._engine._contexts.scheduler.schedulePersistedJob(we, null);
+ }
}
}
}
Modified:
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
(original)
+++
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
Wed Jun 30 22:33:12 2010
@@ -305,4 +305,7 @@ public class MessageExchangeDAOImpl exte
return "mem.mex(direction=" + direction + " id=" + messageExchangeId +
")";
}
+ public boolean lockPremieMessages() {
+ return true;
+ }
}
Modified:
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
(original)
+++
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
Wed Jun 30 22:33:12 2010
@@ -79,7 +79,7 @@ import org.hibernate.criterion.Projectio
public class BpelDAOConnectionImpl implements BpelDAOConnection,
FilteredInstanceDeletable {
private static final Log __log =
LogFactory.getLog(BpelDAOConnectionImpl.class);
- protected SessionManager _sm;
+ public SessionManager _sm;
public BpelDAOConnectionImpl(SessionManager sm) {
_sm = sm;
Modified:
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
(original)
+++
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
Wed Jun 30 22:33:12 2010
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.CorrelationKeySet;
import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.daohib.SessionManager;
import org.apache.ode.daohib.bpel.hobj.HCorrelator;
import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
@@ -36,6 +37,7 @@ import org.hibernate.Hibernate;
import org.hibernate.LockMode;
import org.hibernate.Query;
import org.hibernate.Session;
+import org.hibernate.exception.LockAcquisitionException;
import javax.xml.namespace.QName;
@@ -82,7 +84,12 @@ class CorrelatorDaoImpl extends Hibernat
// We really should consider the possibility of multiple messages
matching a criteria.
// When the message is handled, its not too convenient to attempt to
determine if the
// received message conflicts with one already received.
- Iterator mcors = qry.iterate();
+ Iterator mcors;
+ try {
+ mcors = qry.setLockMode("this", LockMode.UPGRADE).iterate();
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
try {
if (!mcors.hasNext()) {
if (__log.isDebugEnabled())
@@ -122,7 +129,13 @@ class CorrelatorDaoImpl extends Hibernat
q.setLockMode("hs", LockMode.UPGRADE);
List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
- for (HCorrelatorSelector selector :
(List<HCorrelatorSelector>)q.list()) {
+ List<HCorrelatorSelector> list;
+ try {
+ list = (List<HCorrelatorSelector>) q.list();
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+ for (HCorrelatorSelector selector : list) {
if (selector != null) {
boolean isRoutePolicyOne = selector.getRoute() == null ||
"one".equals(selector.getRoute());
if ("all".equals(selector.getRoute()) ||
@@ -135,13 +148,6 @@ class CorrelatorDaoImpl extends Hibernat
if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
- // obtain a lock on the correlator to eliminate potential race
condition.
- if(__log.isDebugEnabled()) __log.debug("Obtain record lock on " +
_hobj);
- Query correlatorLockQuery = getSession().createQuery("from HCorrelator
as hc where id = :id");
- correlatorLockQuery.setLong("id", _hobj.getId());
- correlatorLockQuery.setLockMode("hc", LockMode.UPGRADE);
- correlatorLockQuery.list();
-
return routes;
}
@@ -221,7 +227,11 @@ class CorrelatorDaoImpl extends Hibernat
hsel.setCorrelator(_hobj);
hsel.setCreated(new Date());
hsel.setRoute(routePolicy);
- getSession().save(hsel);
+ try {
+ getSession().save(hsel);
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
if (__log.isDebugEnabled())
__log.debug(hdr + "saved " + hsel);
Modified:
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
(original)
+++
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
Wed Jun 30 22:33:12 2010
@@ -34,6 +34,7 @@ import org.apache.ode.bpel.dao.MessageEx
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.daohib.SessionManager;
import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
import org.apache.ode.daohib.bpel.hobj.HMessage;
@@ -43,6 +44,8 @@ import org.apache.ode.daohib.bpel.hobj.H
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.UnaryFunctionEx;
+import org.hibernate.LockMode;
+import org.hibernate.exception.LockAcquisitionException;
import org.w3c.dom.Element;
public class MessageExchangeDaoImpl extends HibernateDao implements
@@ -366,6 +369,14 @@ public class MessageExchangeDaoImpl exte
deleteByIds(HCorrelatorMessage.class,
getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_IDS_BY_MEX).setParameter("mex",
_hself).list());
}
+ public boolean lockPremieMessages() {
+ try {
+ return
getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_BY_MEX).setLockMode("m",
LockMode.UPGRADE).setParameter("mex", _hself).list().size() > 0;
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+ }
+
public void incrementSubscriberCount() {
_hself.incrementSubscriberCount();
}
Modified:
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
URL:
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
(original)
+++
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
Wed Jun 30 22:33:12 2010
@@ -21,10 +21,12 @@ package org.apache.ode.daohib.bpel.hobj;
/**
* @hibernate.class table="BPEL_UNMATCHED" lazy="true"
- * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_INSTANCES" query="select id
from HCorrelatorMessage as m where m.messageExchange in(select mex from
HMessageExchange as mex where mex.instance in (:instances))"
+ * @hibernate.query name="SELECT_CORMESSAGE_BY_MEX" query="from
HCorrelatorMessage as m where m.messageExchange = :mex"
* @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_MEX" query="select id from
HCorrelatorMessage as m where m.messageExchange = :mex"
+ * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_INSTANCES" query="select id
from HCorrelatorMessage as m where m.messageExchange in(select mex from
HMessageExchange as mex where mex.instance in (:instances))"
*/
public class HCorrelatorMessage extends HObject {
+ public final static String SELECT_CORMESSAGE_BY_MEX =
"SELECT_CORMESSAGE_BY_MEX";
public final static String SELECT_CORMESSAGE_IDS_BY_MEX =
"SELECT_CORMESSAGE_IDS_BY_MEX";
public final static String SELECT_CORMESSAGE_IDS_BY_INSTANCES =
"SELECT_CORMESSAGE_IDS_BY_INSTANCES";
Added:
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java?rev=959463&view=auto
==============================================================================
---
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
(added)
+++
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
Wed Jun 30 22:33:12 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.daohib.bpel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.daohib.SessionManager;
+import org.apache.ode.daohib.bpel.hobj.HCorrelator;
+
+/**
+ * Testing BpelDAOConnectionImpl.listInstance. We're just producing a lot of
+ * different filter combinations and test if they execute ok. To really test
+ * that the result is the one expected would take a huge test database (with at
+ * least a process and an instance for every possible combination).
+ */
+public class MexTest extends BaseTestDAO {
+
+ private Map<String, List> filterElmts;
+ private ArrayList<String> order;
+
+ protected void setUp() throws Exception {
+ initTM();
+ }
+
+ protected void tearDown() throws Exception {
+ stopTM();
+ }
+
+ public void test() throws Exception {
+ MessageExchangeDAO mex = daoConn.createMessageExchange('M');
+ mex.lockPremieMessages();
+
+ SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm;
+ HCorrelator correlator = new HCorrelator();
+ correlator.setCorrelatorId("abc");
+ sm.getSession().save(correlator);
+ new CorrelatorDaoImpl(sm, correlator).dequeueMessage(new
CorrelationKeySet("@2[12~a~b]"));
+ }
+}
Modified:
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL:
http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
---
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
(original)
+++
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
Wed Jun 30 22:33:12 2010
@@ -382,4 +382,8 @@ public class MessageExchangeDAOImpl exte
public void setCreateTime(Date createTime) {
_createTime = createTime;
}
+
+ public boolean lockPremieMessages() {
+ return true;
+ }
}
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL:
http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
(original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Wed
Jun 30 22:33:12 2010
@@ -458,10 +458,14 @@ public final class JacobVPU {
__log.error(msg, iae);
throw new RuntimeException(msg, iae);
} catch (InvocationTargetException e) {
- String msg = __msgs.msgClientMethodException(_method.getName(),
- _methodBody.getClass().getName());
- __log.error(msg, e.getTargetException());
- throw new RuntimeException(e.getTargetException());
+ if (e.getTargetException() instanceof RuntimeException) {
+ throw (RuntimeException) e.getTargetException();
+ } else {
+ String msg =
__msgs.msgClientMethodException(_method.getName(),
+ _methodBody.getClass().getName());
+ __log.error(msg, e.getTargetException());
+ throw new RuntimeException(e.getTargetException());
+ }
} finally {
ctime = System.currentTimeMillis() - ctime;
_statistics.totalClientTimeMs += ctime;