Author: mriou
Date: Wed May 14 17:02:04 2008
New Revision: 656469

URL: http://svn.apache.org/viewvc?rev=656469&view=rev
Log:
Fixed req-only P2P with a semantic closer to reliable invoke.

Modified:
    
ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java

Modified: 
ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- 
ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java 
(original)
+++ 
ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java 
Wed May 14 17:02:04 2008
@@ -31,6 +31,7 @@
             String response = 
server.sendRequestFile("http://localhost:8080/ode/processes/MSMainExecuteService";,
                     bundleName, "testRequest.soap");
 
+            System.out.println("->" + response);
             assertTrue(response.indexOf("OK") > 0);
         } finally {
             server.undeployProcess(bundleName);

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 (original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
 Wed May 14 17:02:04 2008
@@ -293,7 +293,7 @@
                 // conditions for deadlock in the correlation tables. However 
if invocation style is transacted,
                 // we need to do the work right then and there.
 
-                if (istyle == InvocationStyle.TRANSACTED || istyle == 
InvocationStyle.P2P) {
+                if (istyle == InvocationStyle.TRANSACTED) {
                     doInstanceWork(mexdao.getInstance().getInstanceId(), new 
Callable<Void>() {
                         public Void call() {
                             
executeContinueInstanceMyRoleRequestReceived(mexdao);
@@ -1021,15 +1021,13 @@
      * @param runnable
      */
     void scheduleRunnable(final Runnable runnable) {
-        if (__log.isDebugEnabled())
-            __log.debug("schedulingRunnable for process " + _pid + ": " + 
runnable);
+        if (__log.isDebugEnabled()) __log.debug("schedulingRunnable for 
process " + _pid + ": " + runnable);
 
         _server.scheduleRunnable(new ProcessRunnable(runnable));
     }
 
     void enqueueRunnable(BpelInstanceWorker worker) {
-        if (__log.isDebugEnabled())
-            __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
+        if (__log.isDebugEnabled()) __log.debug("enqueuRunnable for process " 
+ _pid + ": " + worker);
 
         _server.enqueueRunnable(new ProcessRunnable(worker));
     }
@@ -1040,7 +1038,6 @@
         final String mexId = new GUID().toString();
         _hydrationLatch.latch(1);
         try {
-
             final PartnerLinkMyRoleImpl target = 
getPartnerLinkForService(targetService);
             if (target == null)
                 throw new BpelEngineException("NoSuchService: " + 
targetService);
@@ -1056,31 +1053,21 @@
     }
 
     void onMyRoleMexAck(MessageExchangeDAO mexdao, Status old) {
-
         if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
-
             BpelProcess caller = _server.getBpelProcess(mexdao.getPipedPID());
-            if (caller == null) {
-                // process no longer deployed....
-
-                return;
-            }
+            // process no longer deployed....
+            if (caller == null) return;
 
             MessageExchangeDAO pmex = 
caller.loadMexDao(mexdao.getPipedMessageExchangeId());
-            if (pmex == null) {
-                // Mex no longer there.... odd..
-
-                return;
-            }
+            // Mex no longer there.... odd..
+            if (pmex == null) return;
 
             // Need to copy the response and state from myrolemex --> 
partnerrolemex
-
             boolean compat = !(caller.isInMemory() ^ isInMemory());
             if (compat) {
                 // both processes are in-mem or both are persisted, can share 
the message
                 pmex.setResponse(mexdao.getResponse());
             } else /* one process in-mem, other persisted */{
-
                 MessageDAO presponse = 
pmex.createMessage(mexdao.getResponse().getType());
                 presponse.setData(mexdao.getResponse().getData());
                 pmex.setResponse(presponse);
@@ -1089,9 +1076,7 @@
             pmex.setAckType(mexdao.getAckType());
             pmex.setFailureType(mexdao.getFailureType());
 
-            if (old == Status.ASYNC)
-                caller.p2pWakeup(pmex);
-
+            if (old == Status.ASYNC) caller.p2pWakeup(pmex);
         } else /* not p2p */{
             // Do an Async wakeup if we are in the ASYNC state. If we're not, 
we'll pick up the ACK when we unwind
             // the stack.
@@ -1105,9 +1090,7 @@
                     __log.error("Integration layer threw an unexepcted 
exception.", t);
                 }
             }
-
         }
-
     }
 
     class ProcessRunnable implements Runnable {
@@ -1116,7 +1099,6 @@
         ProcessRunnable(Runnable work) {
             _work = work;
         }
-
         public void run() {
             _hydrationLatch.latch(1);
             try {
@@ -1124,9 +1106,7 @@
             } finally {
                 _hydrationLatch.release(1);
             }
-
         }
-
     }
 
     class ProcessCallable<T> implements Callable<T> {
@@ -1321,14 +1301,13 @@
      * @param partnerRoleMex
      */
     private void invokeP2P(BpelProcess target, QName serviceName, Operation 
operation, MessageExchangeDAO partnerRoleMex) {
-        if (BpelProcess.__log.isDebugEnabled()) {
-            __log
-                    .debug("Invoking in a p2p interaction, partnerrole " + 
partnerRoleMex.getMessageExchangeId() + " target="
-                            + target);
-        }
+        if (BpelProcess.__log.isDebugEnabled())
+            __log.debug("Invoking in a p2p interaction, partnerrole " + 
partnerRoleMex.getMessageExchangeId()
+                    + " target=" + target);
 
         partnerRoleMex.setInvocationStyle(InvocationStyle.P2P);
 
+        // Plumbing
         MessageExchangeDAO myRoleMex = target.createMessageExchange(new 
GUID().toString(),
                 MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
         myRoleMex.setStatus(Status.REQ);
@@ -1350,10 +1329,9 @@
         String mySessionId = partnerRoleMex.getPartnerLink().getMySessionId();
         String partnerSessionId = 
partnerRoleMex.getPartnerLink().getPartnerSessionId();
 
-        if (BpelProcess.__log.isDebugEnabled()) {
-            __log.debug("Setting myRoleMex session ids for p2p interaction, 
mySession " + partnerSessionId + " - partnerSess "
-                    + mySessionId);
-        }
+        if (BpelProcess.__log.isDebugEnabled())
+            __log.debug("Setting myRoleMex session ids for p2p interaction, 
mySession " + partnerSessionId
+                    + " - partnerSess " + mySessionId);
 
         if (mySessionId != null)
             
partnerRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, 
mySessionId);
@@ -1368,8 +1346,10 @@
         if (__log.isDebugEnabled())
             __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " 
partnerSessionId=" + partnerSessionId);
 
+        // A classic P2P interaction is considered reliable. The invocation 
should take place
+        // in the local transaction but the invoked process is not supposed to 
hold our thread
+        // and the reply should come in a separate transaction.
         target.invokeProcess(myRoleMex);
-
     }
 
     /**

Modified: 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Wed May 14 17:02:04 2008
@@ -710,7 +710,7 @@
         // we need to inject a message on the response channel, so that the 
process continues.
         switch (mexDao.getStatus()) {
         case ACK:
-            injectPartnerResponse(mexDao.getMessageExchangeId(), 
mexDao.getChannel());
+            if (mexDao.getChannel() != null) 
injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
             break;
         case ASYNC:
             // we'll have to wait for the response.


Reply via email to