Author: rr
Date: Wed Jun 30 22:33:12 2010
New Revision: 959463

URL: http://svn.apache.org/viewvc?rev=959463&view=rev
Log:
Second matcher + logging fix for job retries

Added:
    
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
Modified:
    ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
    ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
    
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
    
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
    
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
    
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
    
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
    
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
    ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java

Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java 
(original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Wed Jun 
30 22:33:12 2010
@@ -25,7 +25,9 @@ import java.util.Map;
 
 import javax.transaction.TransactionManager;
 import javax.wsdl.Definition;
+import javax.wsdl.Operation;
 import javax.wsdl.Port;
+import javax.wsdl.PortType;
 import javax.wsdl.Service;
 import javax.wsdl.extensions.UnknownExtensibilityElement;
 import javax.wsdl.extensions.http.HTTPAddress;
@@ -112,7 +114,9 @@ public class ODEService {
             String messageId = new GUID().toString();
             odeMex = _server.getEngine().createMessageExchange("" + messageId, 
_serviceName,
                     msgContext.getAxisOperation().getName().getLocalPart());
-            __log.debug("ODE routed to operation " + odeMex.getOperation() + " 
from service " + _serviceName);
+            __log.debug("ODE routed to portType " + odeMex.getPortType() + " 
operation " + odeMex.getOperation() + " from service " + _serviceName);
+            PortType portType = odeMex.getPortType();
+            Operation operation = odeMex.getOperation();
             odeMex.setProperty("isTwoWay", 
Boolean.toString(msgContext.getAxisOperation() instanceof 
TwoChannelAxisOperation));
             if (odeMex.getOperation() != null) {
                 // Preparing message to send to ODE

Modified: 
ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java 
(original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java 
Wed Jun 30 22:33:12 2010
@@ -176,7 +176,8 @@ public interface Scheduler {
         INVOKE_INTERNAL,
         INVOKE_RESPONSE,
         MATCHER,
-        INVOKE_CHECK
+        INVOKE_CHECK,
+        MEX_MATCHER
     }
 
     public static class JobDetails {

Modified: 
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
 (original)
+++ 
ode/trunk/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
 Wed Jun 30 22:33:12 2010
@@ -252,4 +252,6 @@ public interface MessageExchangeDAO {
      * Deletes messages that arrived before the route is setup
      */
     void releasePremieMessages();
+
+    boolean lockPremieMessages();
 }

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 (original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 Wed Jun 30 22:33:12 2010
@@ -247,12 +247,15 @@ public class BpelEngineImpl implements B
         case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
             mex = new MyRoleMessageExchangeImpl(process, this, mexdao);
             if (process != null) {
-                OPartnerLink plink = (OPartnerLink) 
process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
-                // the partner link might not be hydrated
-                if (plink != null) {
-                    PortType ptype = plink.myRolePortType;
-                    Operation op = 
plink.getMyRoleOperation(mexdao.getOperation());
-                    mex.setPortOp(ptype, op);
+                Object child = 
process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
+                if (child instanceof OPartnerLink) {
+                    OPartnerLink plink = (OPartnerLink) child;
+                    // the partner link might not be hydrated
+                    if (plink != null) {
+                        PortType ptype = plink.myRolePortType;
+                        Operation op = 
plink.getMyRoleOperation(mexdao.getOperation());
+                        mex.setPortOp(ptype, op);
+                    }
                 }
             }
             break;
@@ -473,14 +476,14 @@ public class BpelEngineImpl implements B
                         }
                     }
                 }
-                if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+                if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == 
JobType.MEX_MATCHER) {
                     List<BpelProcess> processes = 
getAllProcesses(we.getProcessId());
                     boolean routed = false;
                     jobInfo.jobDetail.detailsExt.put("enqueue", false);
                     for(BpelProcess proc : processes) {
                         routed = routed || 
proc.handleJobDetails(jobInfo.jobDetail);
                     }
-                    if(!routed) {
+                    if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
                         jobInfo.jobDetail.detailsExt.put("enqueue", true);
                         process.handleJobDetails(jobInfo.jobDetail);
                     }

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Wed Jun 30 22:33:12 2010
@@ -414,7 +414,7 @@ public class BpelProcess {
      * @see 
org.apache.ode.bpel.engine.BpelProcess#handleJobDetails(java.util.Map<java.lang.String,java.lang.Object>)
      */
     public boolean handleJobDetails(JobDetails jobData) {
-        boolean ret = true;
+        boolean routed = true;
         try {
             _hydrationLatch.latch(1);
             markused();
@@ -425,12 +425,20 @@ public class BpelProcess {
             JobDetails we = jobData;
 
             // Process level events
-            if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+            if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == 
JobType.MEX_MATCHER) {
                 if (__log.isDebugEnabled()) {
-                    __log.debug("InvokeInternal event for mexid " + 
we.getMexId());
+                    __log.debug(we.getType() + " event for mexid " + 
we.getMexId());
                 }
                 MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) 
_engine.getMessageExchange(we.getMexId());
-                ret = invokeProcess(mex, (Boolean) 
jobData.detailsExt.get("enqueue"));
+               if (we.getType() == JobType.MEX_MATCHER && 
!mex.getDAO().lockPremieMessages()) {
+                       //Skip if already processed
+                       return true;
+               }
+
+                routed = invokeProcess(mex, (Boolean) 
jobData.detailsExt.get("enqueue"));
+                if (we.getType() == JobType.MEX_MATCHER && routed) {
+                       mex.getDAO().releasePremieMessages();
+                }
             } else {
                 // Instance level events
                 ProcessInstanceDAO procInstance = 
getProcessDAO().getInstance(we.getInstanceId());
@@ -477,7 +485,7 @@ public class BpelProcess {
         } finally {
             _hydrationLatch.release(1);
         }
-        return ret;
+        return routed;
     }
 
     private void setRoles(OProcess oprocess) {

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
 (original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
 Wed Jun 30 22:33:12 2010
@@ -45,6 +45,8 @@ import org.apache.ode.bpel.iapi.Endpoint
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OPartnerLink;
@@ -274,6 +276,18 @@ public class PartnerLinkMyRoleImpl exten
 
                 // No match, means we add message exchange to the queue.
                 routing.correlator.enqueueMessage(mex.getDAO(), 
routing.wholeKeySet);
+                
+                // Second matcher needs to be registered here
+                JobDetails we = new JobDetails();
+                we.setType(JobType.MEX_MATCHER);
+                we.setProcessId(_process.getPID());
+                we.setMexId(mex.getMessageExchangeId());
+                we.setInMem(_process.isInMemory());
+                if(_process.isInMemory()){
+                    
_process._engine._contexts.scheduler.scheduleVolatileJob(true, we);
+                }else{
+                    
_process._engine._contexts.scheduler.schedulePersistedJob(we, null);
+                }
             }
         }
     }

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
 (original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
 Wed Jun 30 22:33:12 2010
@@ -305,4 +305,7 @@ public class MessageExchangeDAOImpl exte
         return "mem.mex(direction=" + direction + " id=" + messageExchangeId + 
")";
     }
 
+       public boolean lockPremieMessages() {
+               return true;
+       }
 }

Modified: 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
 (original)
+++ 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
 Wed Jun 30 22:33:12 2010
@@ -79,7 +79,7 @@ import org.hibernate.criterion.Projectio
 public class BpelDAOConnectionImpl implements BpelDAOConnection, 
FilteredInstanceDeletable {
     private static final Log __log = 
LogFactory.getLog(BpelDAOConnectionImpl.class);
 
-    protected SessionManager _sm;
+    public SessionManager _sm;
 
     public BpelDAOConnectionImpl(SessionManager sm) {
         _sm = sm;

Modified: 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
 (original)
+++ 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
 Wed Jun 30 22:33:12 2010
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.common.CorrelationKeySet;
 import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.daohib.SessionManager;
 import org.apache.ode.daohib.bpel.hobj.HCorrelator;
 import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
@@ -36,6 +37,7 @@ import org.hibernate.Hibernate;
 import org.hibernate.LockMode;
 import org.hibernate.Query;
 import org.hibernate.Session;
+import org.hibernate.exception.LockAcquisitionException;
 
 import javax.xml.namespace.QName;
 
@@ -82,7 +84,12 @@ class CorrelatorDaoImpl extends Hibernat
         // We really should consider the possibility of multiple messages 
matching a criteria.
         // When the message is handled, its not too convenient to attempt to 
determine if the
         // received message conflicts with one already received.
-        Iterator mcors = qry.iterate();
+        Iterator mcors;
+        try {
+            mcors = qry.setLockMode("this", LockMode.UPGRADE).iterate();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
         try {
             if (!mcors.hasNext()) {
                 if (__log.isDebugEnabled())
@@ -122,7 +129,13 @@ class CorrelatorDaoImpl extends Hibernat
         q.setLockMode("hs", LockMode.UPGRADE);
 
         List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
-        for (HCorrelatorSelector selector : 
(List<HCorrelatorSelector>)q.list()) {
+        List<HCorrelatorSelector> list;
+        try {
+            list = (List<HCorrelatorSelector>) q.list();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+        for (HCorrelatorSelector selector : list) {
             if (selector != null) {
                 boolean isRoutePolicyOne = selector.getRoute() == null || 
"one".equals(selector.getRoute());
                 if ("all".equals(selector.getRoute()) ||
@@ -135,13 +148,6 @@ class CorrelatorDaoImpl extends Hibernat
 
         if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
 
-        // obtain a lock on the correlator to eliminate potential race 
condition.
-        if(__log.isDebugEnabled()) __log.debug("Obtain record lock on " + 
_hobj);
-        Query correlatorLockQuery = getSession().createQuery("from HCorrelator 
as hc where id = :id");
-        correlatorLockQuery.setLong("id", _hobj.getId());
-        correlatorLockQuery.setLockMode("hc", LockMode.UPGRADE);
-        correlatorLockQuery.list();
-
         return routes;
     }
 
@@ -221,7 +227,11 @@ class CorrelatorDaoImpl extends Hibernat
         hsel.setCorrelator(_hobj);
         hsel.setCreated(new Date());
         hsel.setRoute(routePolicy);
-        getSession().save(hsel);
+        try {
+            getSession().save(hsel);
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
 
         if (__log.isDebugEnabled())
             __log.debug(hdr + "saved " + hsel);

Modified: 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
 (original)
+++ 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
 Wed Jun 30 22:33:12 2010
@@ -34,6 +34,7 @@ import org.apache.ode.bpel.dao.MessageEx
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.daohib.SessionManager;
 import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
 import org.apache.ode.daohib.bpel.hobj.HMessage;
@@ -43,6 +44,8 @@ import org.apache.ode.daohib.bpel.hobj.H
 import org.apache.ode.utils.DOMUtils;
 import org.apache.ode.utils.stl.CollectionsX;
 import org.apache.ode.utils.stl.UnaryFunctionEx;
+import org.hibernate.LockMode;
+import org.hibernate.exception.LockAcquisitionException;
 import org.w3c.dom.Element;
 
 public class MessageExchangeDaoImpl extends HibernateDao implements
@@ -366,6 +369,14 @@ public class MessageExchangeDaoImpl exte
         deleteByIds(HCorrelatorMessage.class, 
getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_IDS_BY_MEX).setParameter("mex",
 _hself).list());
     }
 
+    public boolean lockPremieMessages() {
+        try {
+            return 
getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_BY_MEX).setLockMode("m",
 LockMode.UPGRADE).setParameter("mex", _hself).list().size() > 0;
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+    }
+
     public void incrementSubscriberCount() {
         _hself.incrementSubscriberCount();
     }

Modified: 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
 (original)
+++ 
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
 Wed Jun 30 22:33:12 2010
@@ -21,10 +21,12 @@ package org.apache.ode.daohib.bpel.hobj;
 
 /**
  * @hibernate.class table="BPEL_UNMATCHED" lazy="true"
- * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_INSTANCES" query="select id 
from HCorrelatorMessage as m where m.messageExchange in(select mex from 
HMessageExchange as mex where mex.instance in (:instances))"
+ * @hibernate.query name="SELECT_CORMESSAGE_BY_MEX" query="from 
HCorrelatorMessage as m where m.messageExchange = :mex"
  * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_MEX" query="select id from 
HCorrelatorMessage as m where m.messageExchange = :mex"
+ * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_INSTANCES" query="select id 
from HCorrelatorMessage as m where m.messageExchange in(select mex from 
HMessageExchange as mex where mex.instance in (:instances))"
  */
 public class HCorrelatorMessage extends HObject {
+    public final static String SELECT_CORMESSAGE_BY_MEX = 
"SELECT_CORMESSAGE_BY_MEX";
     public final static String SELECT_CORMESSAGE_IDS_BY_MEX = 
"SELECT_CORMESSAGE_IDS_BY_MEX";
     public final static String SELECT_CORMESSAGE_IDS_BY_INSTANCES = 
"SELECT_CORMESSAGE_IDS_BY_INSTANCES";
 

Added: 
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java?rev=959463&view=auto
==============================================================================
--- 
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java 
(added)
+++ 
ode/trunk/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java 
Wed Jun 30 22:33:12 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.daohib.bpel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.daohib.SessionManager;
+import org.apache.ode.daohib.bpel.hobj.HCorrelator;
+
+/**
+ * Testing BpelDAOConnectionImpl.listInstance. We're just producing a lot of
+ * different filter combinations and test if they execute ok. To really test
+ * that the result is the one expected would take a huge test database (with at
+ * least a process and an instance for every possible combination).
+ */
+public class MexTest extends BaseTestDAO {
+
+    private Map<String, List> filterElmts;
+    private ArrayList<String> order;
+
+    protected void setUp() throws Exception {
+        initTM();
+    }
+
+    protected void tearDown() throws Exception {
+        stopTM();
+    }
+
+    public void test() throws Exception {
+        MessageExchangeDAO mex = daoConn.createMessageExchange('M');
+        mex.lockPremieMessages();
+        
+        SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm;
+        HCorrelator correlator = new HCorrelator();
+        correlator.setCorrelatorId("abc");
+        sm.getSession().save(correlator);
+        new CorrelatorDaoImpl(sm, correlator).dequeueMessage(new 
CorrelationKeySet("@2[12~a~b]"));
+    }
+}

Modified: 
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- 
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
 (original)
+++ 
ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
 Wed Jun 30 22:33:12 2010
@@ -382,4 +382,8 @@ public class MessageExchangeDAOImpl exte
     public void setCreateTime(Date createTime) {
         _createTime = createTime;
     }
+
+       public boolean lockPremieMessages() {
+               return true;
+       }
 }

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=959463&r1=959462&r2=959463&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
(original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Wed 
Jun 30 22:33:12 2010
@@ -458,10 +458,14 @@ public final class JacobVPU {
                 __log.error(msg, iae);
                 throw new RuntimeException(msg, iae);
             } catch (InvocationTargetException e) {
-                String msg = __msgs.msgClientMethodException(_method.getName(),
-                        _methodBody.getClass().getName());
-                __log.error(msg, e.getTargetException());
-                throw new RuntimeException(e.getTargetException());
+                if (e.getTargetException() instanceof RuntimeException) {
+                    throw (RuntimeException) e.getTargetException();
+                } else {
+                    String msg = 
__msgs.msgClientMethodException(_method.getName(),
+                            _methodBody.getClass().getName());
+                    __log.error(msg, e.getTargetException());
+                    throw new RuntimeException(e.getTargetException());
+                }
             } finally {
                 ctime = System.currentTimeMillis() - ctime;
                 _statistics.totalClientTimeMs += ctime;


Reply via email to