Author: boisvert
Date: Tue Oct 27 22:15:03 2009
New Revision: 830367

URL: http://svn.apache.org/viewvc?rev=830367&view=rev
Log:
-Introduced new property "p2p.mex.timeout" used as default value for 
process-to-process mex timeout
-Changed _serviceMap type to Map<QName, List<BpelProcess>> to optimize routing 
common case
-MEX timeout no longer multipled by 1.5 for invokeCheck


Modified:
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    
ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/Properties.java

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?rev=830367&r1=830366&r2=830367&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 Tue Oct 27 22:15:03 2009
@@ -104,8 +104,8 @@
     /** Active processes, keyed by process id. */
     final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, 
BpelProcess>();
 
-    /** Mapping from myrole endpoint name to active process. */
-    private final HashMap<Endpoint, List<BpelProcess>> _serviceMap = new 
HashMap<Endpoint, List<BpelProcess>>();
+    /** Mapping from myrole service name to active process. */
+    private final HashMap<QName, List<BpelProcess>> _serviceMap = new 
HashMap<QName, List<BpelProcess>>();
 
     /** Mapping from a potentially shared endpoint to its EPR */ 
     private SharedEndpoints _sharedEps;     
@@ -265,10 +265,9 @@
             if (__log.isDebugEnabled())
                 __log.debug("Deactivating process " + p.getPID());
 
-            Iterator<Map.Entry<Endpoint,List<BpelProcess>>> serviceIter = 
_serviceMap.entrySet().iterator();
+            Iterator<List<BpelProcess>> serviceIter = 
_serviceMap.values().iterator();
             while (serviceIter.hasNext()) {
-                Map.Entry<Endpoint,List<BpelProcess>> processEntry = 
serviceIter.next();
-                Iterator<BpelProcess> entryProcesses = 
processEntry.getValue().iterator();
+                Iterator<BpelProcess> entryProcesses = 
serviceIter.next().iterator();
                 while (entryProcesses.hasNext()) {
                     BpelProcess entryProcess = entryProcesses.next();
                     if (entryProcess.getPID().equals(process)) {
@@ -303,10 +302,10 @@
         _activeProcesses.put(process.getPID(), process);
         for (Endpoint e : process.getServiceNames()) {
             __log.debug("Register process: serviceId=" + e + ", process=" + 
process);
-            List<BpelProcess> processes = _serviceMap.get(e);
+            List<BpelProcess> processes = _serviceMap.get(e.serviceName);
             if (processes == null) {
                 processes = new ArrayList<BpelProcess>();
-                _serviceMap.put(e, processes);
+                _serviceMap.put(e.serviceName, processes);
             }
             // Remove any older version of the process from the list
             Iterator<BpelProcess> processesIter = processes.iterator();
@@ -336,12 +335,7 @@
     List<BpelProcess> route(QName service, Message request) {
         // TODO: use the message to route to the correct service if more than
         // one service is listening on the same endpoint.
-
-        List<BpelProcess> routed = null;
-        for (Endpoint endpoint : _serviceMap.keySet()) {
-            if (endpoint.serviceName.equals(service))
-                routed = _serviceMap.get(endpoint);
-        }
+        List<BpelProcess> routed = _serviceMap.get(service);
         if (__log.isDebugEnabled())
             __log.debug("Routed: svcQname " + service + " --> " + routed);
         return routed;

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=830367&r1=830366&r2=830367&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Tue Oct 27 22:15:03 2009
@@ -1100,18 +1100,28 @@
                     PROCESS_MEMORY_TO_SERIALIZED_SIZE_RATIO;
     }
 
-    public long getTimeout(OPartnerLink partnerLink) {
+    public long getTimeout(OPartnerLink partnerLink, boolean p2p) {
         // OPartnerLink, PartnerLinkPartnerRoleImpl
         final PartnerLinkPartnerRoleImpl linkPartnerRole = 
_partnerRoles.get(partnerLink);
         long timeout = Properties.DEFAULT_MEX_TIMEOUT;
         if (linkPartnerRole._initialEPR != null) {
-            String timeout_property = 
_pconf.getEndpointProperties(linkPartnerRole._initialEPR).get(Properties.PROP_MEX_TIMEOUT);
-            if (timeout_property != null) {
+            String property = null;
+            String value = null;
+            Map<String, String> props = 
_pconf.getEndpointProperties(linkPartnerRole._initialEPR);
+            if (p2p) {
+                property = Properties.PROP_P2P_MEX_TIMEOUT;
+                value = props.get(property);
+            }
+            if (value == null) {
+                property = Properties.PROP_MEX_TIMEOUT;
+                value = props.get(property);
+            }
+            if (value != null) {
                 try {
-                    timeout = Long.parseLong(timeout_property);
+                    timeout = Long.parseLong(value);
                 } catch (NumberFormatException e) {
                     if (__log.isWarnEnabled())
-                        __log.warn("Mal-formatted Property: [" + 
Properties.PROP_MEX_TIMEOUT + "=" + timeout_property + "] Default value (" + 
timeout + ") will be used");
+                        __log.warn("Mal-formatted Property: [" + property + 
"=" + value + "] Default value (" + timeout + ") will be used");
                 }
             }
         }

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=830367&r1=830366&r2=830367&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Tue Oct 27 22:15:03 2009
@@ -771,7 +771,7 @@
             myRoleMex.invoke(odeRequest);
 
             // Can't expect any sync response
-            scheduleInvokeCheck(mex, partnerLink.partnerLink);
+            scheduleInvokeCheck(mex, partnerLink.partnerLink, true);
             mex.replyAsync();
         } else {
             // If we couldn't find the endpoint, then there is no sense
@@ -780,7 +780,7 @@
                 mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
                 mex.setStatus(MessageExchange.Status.REQUEST);
                 // Assuming an unreliable protocol, we schedule a task to 
check if recovery mode will be needed
-                scheduleInvokeCheck(mex, partnerLink.partnerLink);
+                scheduleInvokeCheck(mex, partnerLink.partnerLink, false);
                 _bpelProcess._engine._contexts.mexContext.invokePartner(mex);
             } else {
                 __log.error("Couldn't find endpoint for partner EPR " + 
DOMUtils.domToString(partnerEPR));
@@ -832,7 +832,7 @@
         return _bpelProcess;
     }
 
-    private void scheduleInvokeCheck(PartnerRoleMessageExchangeImpl mex, 
OPartnerLink partnerLink) {
+    private void scheduleInvokeCheck(PartnerRoleMessageExchangeImpl mex, 
OPartnerLink partnerLink, boolean p2p) {
         boolean isTwoWay = mex.getMessageExchangePattern() ==
                 
org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
         if (!_bpelProcess.isInMemory() && isTwoWay) {
@@ -842,7 +842,7 @@
             event.setInMem(false);
             event.setType(WorkEvent.Type.INVOKE_CHECK);
             // use a greater timeout to make sure the check job does not get 
executed while the service invocation is still waiting for a response
-            final long timeout = (long) 
(getBpelProcess().getTimeout(partnerLink)*1.5);
+            long timeout = getBpelProcess().getTimeout(partnerLink, p2p);
             if (__log.isDebugEnabled()) __log.debug("Creating invocation check 
event in "+timeout+"ms for mexid " + mex.getMessageExchangeId());
             Date future = new Date(System.currentTimeMillis() + timeout);
             String jobId = 
_bpelProcess._engine._contexts.scheduler.schedulePersistedJob(event.getDetail(),
 future);

Modified: 
ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/Properties.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/Properties.java?rev=830367&r1=830366&r2=830367&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/Properties.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/utils/src/main/java/org/apache/ode/utils/Properties.java
 Tue Oct 27 22:15:03 2009
@@ -48,13 +48,18 @@
 public class Properties {
 
     /**
-     * Property used to define how long (in miiliseconds) the message will 
wait for a response. Default value is {...@link #DEFAULT_MEX_TIMEOUT}
+     * Property used to define how long (in milliseconds) the message will 
wait for a response. Default value is {...@link #DEFAULT_MEX_TIMEOUT}
      */
     public static final String PROP_MEX_TIMEOUT = "mex.timeout";
+
+    /**
+     * Property used to define how long (in milliseconds) the message will 
wait for a response for process-to-process invocations.
+     */
+    public static final String PROP_P2P_MEX_TIMEOUT = "p2p.mex.timeout";
+
     // its default value
     public static final int DEFAULT_MEX_TIMEOUT = 2 * 60 * 1000;
 
-
     public static final String PROP_HTTP_CONNECTION_TIMEOUT = 
HttpConnectionParams.CONNECTION_TIMEOUT;
     public static final String PROP_HTTP_SOCKET_TIMEOUT = 
HttpMethodParams.SO_TIMEOUT;
     public static final String PROP_HTTP_PROTOCOL_VERSION = 
HttpMethodParams.PROTOCOL_VERSION;


Reply via email to