Author: seanahn
Date: Thu Oct 15 23:44:34 2009
New Revision: 825703

URL: http://svn.apache.org/viewvc?rev=825703&view=rev
Log:
ode-680, Do not send back the response right away when a transaction fail

Added:
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
Modified:
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?rev=825703&r1=825702&r2=825703&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
 Thu Oct 15 23:44:34 2009
@@ -54,7 +54,7 @@
     
     protected BpelProcess _process;
 
-    private static Map<String, ResponseCallback> _waitingCallbacks =
+    protected static Map<String, ResponseCallback> _waitingCallbacks =
             new ConcurrentHashMap<String, ResponseCallback>();
 
     public MyRoleMessageExchangeImpl(BpelProcess process, BpelEngineImpl 
engine, MessageExchangeDAO mexdao) {
@@ -186,7 +186,7 @@
     public void release(boolean instanceSucceeded) {
         if(__log.isDebugEnabled()) __log.debug("Releasing mex " + 
getMessageExchangeId());
         if (_process != null) {
-               
_dao.release(_process.isCleanupCategoryEnabled(instanceSucceeded, 
CLEANUP_CATEGORY.MESSAGES));
+            _dao.release(_process.isCleanupCategoryEnabled(instanceSucceeded, 
CLEANUP_CATEGORY.MESSAGES));
         }
         _dao = null;
     }
@@ -197,21 +197,21 @@
      * @param message
      * @return
      */
-       protected Message cloneMessage(Message message) {
-               Message clone = createMessage(message.getType());
-               clone.setMessage((Element) 
message.getMessage().cloneNode(true));
-               Map<String, Node> headerParts = message.getHeaderParts();
-               for (String partName : headerParts.keySet()) {
-                       clone.setHeaderPart(partName, (Element) 
headerParts.get(partName).cloneNode(true)); 
-               }
-               Map<String, Node> parts = message.getHeaderParts();
-               for (String partName : parts.keySet()) {
-                       clone.setHeaderPart(partName, (Element) 
parts.get(partName).cloneNode(true)); 
-               }
-               return clone;
-       }
+    protected Message cloneMessage(Message message) {
+        Message clone = createMessage(message.getType());
+        clone.setMessage((Element) message.getMessage().cloneNode(true));
+        Map<String, Node> headerParts = message.getHeaderParts();
+        for (String partName : headerParts.keySet()) {
+            clone.setHeaderPart(partName, (Element) 
headerParts.get(partName).cloneNode(true)); 
+        }
+        Map<String, Node> parts = message.getHeaderParts();
+        for (String partName : parts.keySet()) {
+            clone.setHeaderPart(partName, (Element) 
parts.get(partName).cloneNode(true)); 
+        }
+        return clone;
+    }
     
-       @SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
     static class ResponseFuture implements Future {
         private String _clientId;
         private boolean _done = false;
@@ -249,13 +249,18 @@
         }
     }
 
+    @Override
     protected void responseReceived() {
         final String cid = getClientId();
         _engine._contexts.scheduler.registerSynchronizer(new 
Scheduler.Synchronizer() {
             public void afterCompletion(boolean success) {
                 __log.debug("Received myrole mex response callback");
-                ResponseCallback callback = _waitingCallbacks.remove(cid);
-                if (callback != null) callback.responseReceived();
+                if( success ) {
+                    ResponseCallback callback = _waitingCallbacks.remove(cid);
+                    if (callback != null) callback.responseReceived();
+                } else {
+                    __log.warn("Transaction is rolled back on sending back the 
response.");
+                }
             }
             public void beforeCompletion() {
             }
@@ -288,5 +293,4 @@
             _timedout = _waiting;
         }
     }
-
 }

Added: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java?rev=825703&view=auto
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
 (added)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
 Thu Oct 15 23:44:34 2009
@@ -0,0 +1,91 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.transaction.TransactionManager;
+
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl.ResponseCallback;
+import org.apache.ode.scheduler.simple.SimpleScheduler;
+import org.jmock.Mock;
+import org.jmock.MockObjectTestCase;
+
+public class MyRoleMessageExchangeImplTest extends MockObjectTestCase {
+    private Mock mexDao;
+    
+    private TestMyRoleMessageExchangeImpl myRoleMexImpl;
+    Contexts contexts;
+    BpelEngineImpl engine;
+    TransactionManager _txm;
+    
+    public void testResponseReceived() throws Exception {
+        
mexDao.expects(exactly(3)).method("getCorrelationId").will(returnValue("corrId"));
+        
+        final boolean[] responded = new boolean[1];
+        myRoleMexImpl.callbacks().put("corrId", new ResponseCallback() {
+            synchronized boolean responseReceived() {
+                responded[0] = true;
+                return true;
+            }
+
+            synchronized void waitResponse(long timeout) {
+            }
+        });
+        
+        _txm.begin();
+        myRoleMexImpl.responseReceived();
+        _txm.rollback();
+        
+        _txm.begin();
+        myRoleMexImpl.responseReceived();
+        _txm.rollback();
+        
+        _txm.begin();
+        myRoleMexImpl.responseReceived();
+        _txm.commit();
+        
+        assertTrue(responded[0]);
+    }
+    
+    public void testResponseTimeout() throws Exception {
+        
mexDao.expects(atLeastOnce()).method("getCorrelationId").will(returnValue("corrId"));
+        myRoleMexImpl.callbacks().put("corrId", new 
MyRoleMessageExchangeImpl.ResponseCallback());
+
+        _txm.begin();
+        myRoleMexImpl.responseReceived();
+        _txm.rollback();
+
+        try {
+            new MyRoleMessageExchangeImpl.ResponseFuture("corrId").get(10, 
TimeUnit.MILLISECONDS);
+            fail("Should throw a TimeoutException!!");
+        } catch( TimeoutException te ) {}
+    }
+    
+    protected void setUp() throws Exception {
+        _txm = new GeronimoTransactionManager();
+        
+        mexDao = new Mock(MessageExchangeDAO.class);
+        SimpleScheduler scheduler = new SimpleScheduler("node", null, new 
Properties());
+        scheduler.setTransactionManager(_txm);
+        
+        contexts = new Contexts();
+        contexts.scheduler = scheduler;
+        engine = new BpelEngineImpl(contexts);
+
+        myRoleMexImpl = new TestMyRoleMessageExchangeImpl();
+    }
+
+    class TestMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
+        public TestMyRoleMessageExchangeImpl() {
+            super(null, engine, (MessageExchangeDAO)mexDao.proxy());
+        }
+        
+        public Map<String, ResponseCallback> callbacks() {
+            return _waitingCallbacks;
+        }
+    }
+}


Reply via email to