Repository: ode Updated Branches: refs/heads/ode-1.3.x c0c7b497a -> ba61210b4
Second matcher + logging fix for job retries. Ported from trunk commit https://github.com/apache/ode/commit/57e6d71bc153464ab71eb982293d138abbaf85b6 Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/566d3d1a Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/566d3d1a Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/566d3d1a Branch: refs/heads/ode-1.3.x Commit: 566d3d1ae886a7459cdfb75155af81cb9c82c91d Parents: c0c7b49 Author: sathwik <[email protected]> Authored: Wed Jan 6 13:23:03 2016 +0530 Committer: sathwik <[email protected]> Committed: Wed Jan 6 13:23:03 2016 +0530 ---------------------------------------------------------------------- .../java/org/apache/ode/axis2/ODEService.java | 2 +- .../org/apache/ode/bpel/iapi/Scheduler.java | 9 +- .../apache/ode/bpel/dao/MessageExchangeDAO.java | 2 + .../apache/ode/bpel/engine/BpelEngineImpl.java | 21 +++-- .../org/apache/ode/bpel/engine/BpelProcess.java | 18 ++-- .../ode/bpel/engine/PartnerLinkMyRoleImpl.java | 14 +++ .../ode/bpel/memdao/MessageExchangeDAOImpl.java | 3 + .../ode/daohib/bpel/CorrelatorDaoImpl.java | 30 ++++--- .../ode/daohib/bpel/MessageExchangeDaoImpl.java | 13 ++- .../daohib/bpel/hobj/HCorrelatorMessage.java | 2 + .../org/apache/ode/daohib/bpel/MexTest.java | 93 ++++++++++++++++++++ .../ode/dao/jpa/MessageExchangeDAOImpl.java | 4 + .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 12 ++- 13 files changed, 192 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java ---------------------------------------------------------------------- diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java b/axis2/src/main/java/org/apache/ode/axis2/ODEService.java index f3876d4..35c090b 100644 --- a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java +++ b/axis2/src/main/java/org/apache/ode/axis2/ODEService.java @@ -112,7 +112,7 @@ public class ODEService { String messageId = new GUID().toString(); odeMex = _server.getEngine().createMessageExchange("" + messageId, _serviceName, msgContext.getAxisOperation().getName().getLocalPart()); - __log.debug("ODE routed to operation " + odeMex.getOperation() + " from service " + _serviceName); + __log.debug("ODE routed to portType " + odeMex.getPortType() + " operation " + odeMex.getOperation() + " from service " + _serviceName); odeMex.setProperty("isTwoWay", Boolean.toString(msgContext.getAxisOperation() instanceof TwoChannelAxisOperation)); if (odeMex.getOperation() != null) { // Preparing message to send to ODE http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java ---------------------------------------------------------------------- diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java index 52a6a31..85079b2 100644 --- a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java +++ b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java @@ -208,7 +208,14 @@ public interface Scheduler { * out, checks whether a response has arrived and if not, it marks the MEX as * faulted. */ - INVOKE_CHECK + INVOKE_CHECK, + + /** + * is used to avoid the race condition when a message has been correlated but + * no process instance is able to process it and the route has been added + * meanwhile. It just retries the correlation. + */ + MEX_MATCHER } public static class JobDetails { http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java ---------------------------------------------------------------------- diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java index 786c102..5de9bbe 100644 --- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java +++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java @@ -252,4 +252,6 @@ public interface MessageExchangeDAO { * Deletes messages that arrived before the route is setup */ void releasePremieMessages(); + + boolean lockPremieMessages(); } http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java index 7fdf924..6a1edce 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java @@ -247,13 +247,16 @@ public class BpelEngineImpl implements BpelEngine { case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE: mex = new MyRoleMessageExchangeImpl(process, this, mexdao); if (process != null) { - OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId()); - // the partner link might not be hydrated - if (plink != null) { - PortType ptype = plink.myRolePortType; - Operation op = plink.getMyRoleOperation(mexdao.getOperation()); - mex.setPortOp(ptype, op); - } + Object child = process.getOProcess().getChild(mexdao.getPartnerLinkModelId()); + if (child instanceof OPartnerLink) { + OPartnerLink plink = (OPartnerLink) child; + // the partner link might not be hydrated + if (plink != null) { + PortType ptype = plink.myRolePortType; + Operation op = plink.getMyRoleOperation(mexdao.getOperation()); + mex.setPortOp(ptype, op); + } + } } break; default: @@ -477,7 +480,7 @@ public class BpelEngineImpl implements BpelEngine { } } - if (we.getType().equals(JobType.INVOKE_INTERNAL)) { + if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) { List<BpelProcess> processes = getAllProcesses(we.getProcessId()); boolean routed = false; jobInfo.jobDetail.detailsExt.put("enqueue", false); @@ -486,7 +489,7 @@ public class BpelEngineImpl implements BpelEngine { routed = routed || proc.handleJobDetails(jobInfo.jobDetail); } - if(!routed) { + if(!routed && we.getType() == JobType.INVOKE_INTERNAL) { jobInfo.jobDetail.detailsExt.put("enqueue", true); process.handleJobDetails(jobInfo.jobDetail); } http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java index cfddef7..afe669a 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java @@ -414,7 +414,7 @@ public class BpelProcess { * @see org.apache.ode.bpel.engine.BpelProcess#handleJobDetails(java.util.Map<java.lang.String,java.lang.Object>) */ public boolean handleJobDetails(JobDetails jobData) { - boolean ret = true; + boolean routed = true; try { _hydrationLatch.latch(1); markused(); @@ -425,12 +425,20 @@ public class BpelProcess { JobDetails we = jobData; // Process level events - if (we.getType().equals(JobType.INVOKE_INTERNAL)) { + if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) { if (__log.isDebugEnabled()) { - __log.debug("InvokeInternal event for mexid " + we.getMexId()); + __log.debug(we.getType() + " event for mexid " + we.getMexId()); } MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId()); - ret = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue")); + if (we.getType() == JobType.MEX_MATCHER && !mex.getDAO().lockPremieMessages()) { + //Skip if already processed + return true; + } + + routed = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue")); + if (we.getType() == JobType.MEX_MATCHER && routed) { + mex.getDAO().releasePremieMessages(); + } } else { // Instance level events ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getInstanceId()); @@ -477,7 +485,7 @@ public class BpelProcess { } finally { _hydrationLatch.release(1); } - return ret; + return routed; } private void setRoles(OProcess oprocess) { http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java index 4537286..203a199 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java @@ -45,6 +45,8 @@ import org.apache.ode.bpel.iapi.Endpoint; import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.bpel.iapi.MyRoleMessageExchange; import org.apache.ode.bpel.iapi.ProcessState; +import org.apache.ode.bpel.iapi.Scheduler.JobDetails; +import org.apache.ode.bpel.iapi.Scheduler.JobType; import org.apache.ode.bpel.intercept.InterceptorInvoker; import org.apache.ode.bpel.o.OMessageVarType; import org.apache.ode.bpel.o.OPartnerLink; @@ -275,6 +277,18 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl { // No match, means we add message exchange to the queue. routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet); + + //Second matcher needs to be registered here + JobDetails we = new JobDetails(); + we.setType(JobType.MEX_MATCHER); + we.setProcessId(_process.getPID()); + we.setMexId(mex.getMessageExchangeId()); + we.setInMem(_process.isInMemory()); + if(_process.isInMemory()){ + _process._engine._contexts.scheduler.scheduleVolatileJob(true, we); + }else{ + _process._engine._contexts.scheduler.schedulePersistedJob(we, null); + } } } } http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java ---------------------------------------------------------------------- diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java index 9f71b30..5369694 100644 --- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java +++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java @@ -305,4 +305,7 @@ public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchan return "mem.mex(direction=" + direction + " id=" + messageExchangeId + ")"; } + public boolean lockPremieMessages() { + return true; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java index 42fc3e6..c4cbd6c 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.ode.bpel.common.CorrelationKey; import org.apache.ode.bpel.common.CorrelationKeySet; import org.apache.ode.bpel.dao.*; +import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.daohib.SessionManager; import org.apache.ode.daohib.bpel.hobj.HCorrelator; import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage; @@ -36,6 +37,7 @@ import org.hibernate.Hibernate; import org.hibernate.LockMode; import org.hibernate.Query; import org.hibernate.Session; +import org.hibernate.exception.LockAcquisitionException; import javax.xml.namespace.QName; @@ -82,7 +84,12 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { // We really should consider the possibility of multiple messages matching a criteria. // When the message is handled, its not too convenient to attempt to determine if the // received message conflicts with one already received. - Iterator mcors = qry.iterate(); + Iterator mcors; + try { + mcors = qry.setLockMode("this", LockMode.UPGRADE).iterate(); + } catch (LockAcquisitionException e) { + throw new Scheduler.JobProcessorException(e, true); + } try { if (!mcors.hasNext()) { if (__log.isDebugEnabled()) @@ -122,7 +129,13 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { q.setLockMode("hs", LockMode.UPGRADE); List<HProcessInstance> targets = new ArrayList<HProcessInstance>(); - for (HCorrelatorSelector selector : (List<HCorrelatorSelector>)q.list()) { + List<HCorrelatorSelector> list; + try { + list = (List<HCorrelatorSelector>) q.list(); + } catch (LockAcquisitionException e) { + throw new Scheduler.JobProcessorException(e, true); + } + for (HCorrelatorSelector selector : list) { if (selector != null) { boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute()); if ("all".equals(selector.getRoute()) || @@ -135,13 +148,6 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes); - // obtain a lock on the correlator to eliminate potential race condition. - if(__log.isDebugEnabled()) __log.debug("Obtain record lock on " + _hobj); - Query correlatorLockQuery = getSession().createQuery("from HCorrelator as hc where id = :id"); - correlatorLockQuery.setLong("id", _hobj.getId()); - correlatorLockQuery.setLockMode("hc", LockMode.UPGRADE); - correlatorLockQuery.list(); - return routes; } @@ -221,7 +227,11 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO { hsel.setCorrelator(_hobj); hsel.setCreated(new Date()); hsel.setRoute(routePolicy); - getSession().save(hsel); + try { + getSession().save(hsel); + } catch (LockAcquisitionException e) { + throw new Scheduler.JobProcessorException(e, true); + } if (__log.isDebugEnabled()) __log.debug(hdr + "saved " + hsel); http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java index 5b711ce..91fd903 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java @@ -34,6 +34,7 @@ import org.apache.ode.bpel.dao.MessageExchangeDAO; import org.apache.ode.bpel.dao.PartnerLinkDAO; import org.apache.ode.bpel.dao.ProcessDAO; import org.apache.ode.bpel.dao.ProcessInstanceDAO; +import org.apache.ode.bpel.iapi.Scheduler; import org.apache.ode.daohib.SessionManager; import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage; import org.apache.ode.daohib.bpel.hobj.HMessage; @@ -43,6 +44,8 @@ import org.apache.ode.daohib.bpel.hobj.HProcessInstance; import org.apache.ode.utils.DOMUtils; import org.apache.ode.utils.stl.CollectionsX; import org.apache.ode.utils.stl.UnaryFunctionEx; +import org.hibernate.LockMode; +import org.hibernate.exception.LockAcquisitionException; import org.w3c.dom.Element; public class MessageExchangeDaoImpl extends HibernateDao implements @@ -377,5 +380,13 @@ public class MessageExchangeDaoImpl extends HibernateDao implements getSession().delete(_hself); // This deletes endpoint LData, callbackEndpoint LData, request HMessage, response HMessage, HMessageExchangeProperty } - + + public boolean lockPremieMessages() { + try { + return getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_BY_MEX).setLockMode("m", LockMode.UPGRADE).setParameter("mex", _hself).list().size() > 0; + } catch (LockAcquisitionException e) { + throw new Scheduler.JobProcessorException(e, true); + } + } + } http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java index 92a7747..0014244 100644 --- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java +++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java @@ -23,10 +23,12 @@ package org.apache.ode.daohib.bpel.hobj; * @hibernate.class table="BPEL_UNMATCHED" lazy="true" * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_INSTANCES" query="select id from HCorrelatorMessage as m where m.messageExchange in(select mex from HMessageExchange as mex where mex.instance in (:instances))" * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_MEX" query="select id from HCorrelatorMessage as m where m.messageExchange = :mex" + * @hibernate.query name="SELECT_CORMESSAGE_BY_MEX" query="from HCorrelatorMessage as m where m.messageExchange = :mex" */ public class HCorrelatorMessage extends HObject { public final static String SELECT_CORMESSAGE_IDS_BY_MEX = "SELECT_CORMESSAGE_IDS_BY_MEX"; public final static String SELECT_CORMESSAGE_IDS_BY_INSTANCES = "SELECT_CORMESSAGE_IDS_BY_INSTANCES"; + public final static String SELECT_CORMESSAGE_BY_MEX = "SELECT_CORMESSAGE_BY_MEX"; private HMessageExchange _messageExchange; private HCorrelator _correlator; http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java ---------------------------------------------------------------------- diff --git a/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java b/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java new file mode 100644 index 0000000..741a513 --- /dev/null +++ b/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ode.daohib.bpel; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.xml.namespace.QName; + +import org.apache.ode.bpel.common.CorrelationKeySet; +import org.apache.ode.bpel.common.InstanceFilter; +import org.apache.ode.bpel.dao.CorrelatorDAO; +import org.apache.ode.bpel.dao.MessageExchangeDAO; +import org.apache.ode.bpel.dao.ProcessDAO; +import org.apache.ode.bpel.dao.ProcessInstanceDAO; +import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY; +import org.apache.ode.daohib.SessionManager; +import org.apache.ode.daohib.bpel.hobj.HCorrelator; + +/** + * Testing BpelDAOConnectionImpl.listInstance. We're just producing a lot of + * different filter combinations and test if they execute ok. To really test + * that the result is the one expected would take a huge test database (with at + * least a process and an instance for every possible combination). + */ +public class MexTest extends BaseTestDAO { + + private Map<String, List> filterElmts; + private ArrayList<String> order; + + protected void setUp() throws Exception { + initTM(); + } + + protected void tearDown() throws Exception { + stopTM(); + } + + public void test() throws Exception { + MessageExchangeDAO mex = daoConn.createMessageExchange('M'); + mex.lockPremieMessages(); + + SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm; + HCorrelator correlator = new HCorrelator(); + correlator.setCorrelatorId("abc"); + sm.getSession().save(correlator); + new CorrelatorDaoImpl(sm, correlator).dequeueMessage(new CorrelationKeySet("@2[12~a~b]")); + } + + public void testCleanup() throws Exception { + SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm; + ProcessDAO p = daoConn.createProcess(QName.valueOf("abc"), QName.valueOf("abc"), "abc", 1); + CorrelatorDAO correlator = p.addCorrelator("abc"); + ProcessInstanceDAO instance = p.createInstance(correlator); + + MessageExchangeDAO mex = daoConn.createMessageExchange('M'); + mex.setProperty("abc", "def"); + mex.setInstance(instance); + + txm.commit(); + txm.begin(); + assertEquals(1, sm.getSession().createSQLQuery("select count(*) from BPEL_MEX_PROPS").list().get(0)); + + Set<CLEANUP_CATEGORY> cleanupCategories = EnumSet.allOf(CLEANUP_CATEGORY.class); + instance.delete(cleanupCategories); + txm.commit(); + txm.begin(); + + assertEquals(0, sm.getSession().createSQLQuery("select count(*) from BPEL_MEX_PROPS").list().get(0)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java ---------------------------------------------------------------------- diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java index d9e5617..c0f35ff 100644 --- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java +++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java @@ -383,4 +383,8 @@ public class MessageExchangeDAOImpl extends OpenJPADAO implements MessageExchang public void setCreateTime(Date createTime) { _createTime = createTime; } + + public boolean lockPremieMessages() { + return true; + } } http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java ---------------------------------------------------------------------- diff --git a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java index 1ea83d6..0d4f622 100644 --- a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java +++ b/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java @@ -458,10 +458,14 @@ public final class JacobVPU { __log.error(msg, iae); throw new RuntimeException(msg, iae); } catch (InvocationTargetException e) { - String msg = __msgs.msgClientMethodException(_method.getName(), - _methodBody.getClass().getName()); - __log.error(msg, e.getTargetException()); - throw new RuntimeException(e.getTargetException()); + if (e.getTargetException() instanceof RuntimeException) { + throw (RuntimeException) e.getTargetException(); + } else { + String msg = __msgs.msgClientMethodException(_method.getName(), + _methodBody.getClass().getName()); + __log.error(msg, e.getTargetException()); + throw new RuntimeException(e.getTargetException()); + } } finally { ctime = System.currentTimeMillis() - ctime; _statistics.totalClientTimeMs += ctime;
