Author: seanahn
Date: Thu Oct 15 23:44:34 2009
New Revision: 825703
URL: http://svn.apache.org/viewvc?rev=825703&view=rev
Log:
ode-680, Do not send back the response right away when a transaction fail
Added:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
Modified:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Modified:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?rev=825703&r1=825702&r2=825703&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Thu Oct 15 23:44:34 2009
@@ -54,7 +54,7 @@
protected BpelProcess _process;
- private static Map<String, ResponseCallback> _waitingCallbacks =
+ protected static Map<String, ResponseCallback> _waitingCallbacks =
new ConcurrentHashMap<String, ResponseCallback>();
public MyRoleMessageExchangeImpl(BpelProcess process, BpelEngineImpl
engine, MessageExchangeDAO mexdao) {
@@ -186,7 +186,7 @@
public void release(boolean instanceSucceeded) {
if(__log.isDebugEnabled()) __log.debug("Releasing mex " +
getMessageExchangeId());
if (_process != null) {
-
_dao.release(_process.isCleanupCategoryEnabled(instanceSucceeded,
CLEANUP_CATEGORY.MESSAGES));
+ _dao.release(_process.isCleanupCategoryEnabled(instanceSucceeded,
CLEANUP_CATEGORY.MESSAGES));
}
_dao = null;
}
@@ -197,21 +197,21 @@
* @param message
* @return
*/
- protected Message cloneMessage(Message message) {
- Message clone = createMessage(message.getType());
- clone.setMessage((Element)
message.getMessage().cloneNode(true));
- Map<String, Node> headerParts = message.getHeaderParts();
- for (String partName : headerParts.keySet()) {
- clone.setHeaderPart(partName, (Element)
headerParts.get(partName).cloneNode(true));
- }
- Map<String, Node> parts = message.getHeaderParts();
- for (String partName : parts.keySet()) {
- clone.setHeaderPart(partName, (Element)
parts.get(partName).cloneNode(true));
- }
- return clone;
- }
+ protected Message cloneMessage(Message message) {
+ Message clone = createMessage(message.getType());
+ clone.setMessage((Element) message.getMessage().cloneNode(true));
+ Map<String, Node> headerParts = message.getHeaderParts();
+ for (String partName : headerParts.keySet()) {
+ clone.setHeaderPart(partName, (Element)
headerParts.get(partName).cloneNode(true));
+ }
+ Map<String, Node> parts = message.getHeaderParts();
+ for (String partName : parts.keySet()) {
+ clone.setHeaderPart(partName, (Element)
parts.get(partName).cloneNode(true));
+ }
+ return clone;
+ }
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked")
static class ResponseFuture implements Future {
private String _clientId;
private boolean _done = false;
@@ -249,13 +249,18 @@
}
}
+ @Override
protected void responseReceived() {
final String cid = getClientId();
_engine._contexts.scheduler.registerSynchronizer(new
Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
__log.debug("Received myrole mex response callback");
- ResponseCallback callback = _waitingCallbacks.remove(cid);
- if (callback != null) callback.responseReceived();
+ if( success ) {
+ ResponseCallback callback = _waitingCallbacks.remove(cid);
+ if (callback != null) callback.responseReceived();
+ } else {
+ __log.warn("Transaction is rolled back on sending back the
response.");
+ }
}
public void beforeCompletion() {
}
@@ -288,5 +293,4 @@
_timedout = _waiting;
}
}
-
}
Added:
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java?rev=825703&view=auto
==============================================================================
---
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
(added)
+++
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
Thu Oct 15 23:44:34 2009
@@ -0,0 +1,91 @@
+package org.apache.ode.bpel.engine;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.transaction.TransactionManager;
+
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl.ResponseCallback;
+import org.apache.ode.scheduler.simple.SimpleScheduler;
+import org.jmock.Mock;
+import org.jmock.MockObjectTestCase;
+
+public class MyRoleMessageExchangeImplTest extends MockObjectTestCase {
+ private Mock mexDao;
+
+ private TestMyRoleMessageExchangeImpl myRoleMexImpl;
+ Contexts contexts;
+ BpelEngineImpl engine;
+ TransactionManager _txm;
+
+ public void testResponseReceived() throws Exception {
+
mexDao.expects(exactly(3)).method("getCorrelationId").will(returnValue("corrId"));
+
+ final boolean[] responded = new boolean[1];
+ myRoleMexImpl.callbacks().put("corrId", new ResponseCallback() {
+ synchronized boolean responseReceived() {
+ responded[0] = true;
+ return true;
+ }
+
+ synchronized void waitResponse(long timeout) {
+ }
+ });
+
+ _txm.begin();
+ myRoleMexImpl.responseReceived();
+ _txm.rollback();
+
+ _txm.begin();
+ myRoleMexImpl.responseReceived();
+ _txm.rollback();
+
+ _txm.begin();
+ myRoleMexImpl.responseReceived();
+ _txm.commit();
+
+ assertTrue(responded[0]);
+ }
+
+ public void testResponseTimeout() throws Exception {
+
mexDao.expects(atLeastOnce()).method("getCorrelationId").will(returnValue("corrId"));
+ myRoleMexImpl.callbacks().put("corrId", new
MyRoleMessageExchangeImpl.ResponseCallback());
+
+ _txm.begin();
+ myRoleMexImpl.responseReceived();
+ _txm.rollback();
+
+ try {
+ new MyRoleMessageExchangeImpl.ResponseFuture("corrId").get(10,
TimeUnit.MILLISECONDS);
+ fail("Should throw a TimeoutException!!");
+ } catch( TimeoutException te ) {}
+ }
+
+ protected void setUp() throws Exception {
+ _txm = new GeronimoTransactionManager();
+
+ mexDao = new Mock(MessageExchangeDAO.class);
+ SimpleScheduler scheduler = new SimpleScheduler("node", null, new
Properties());
+ scheduler.setTransactionManager(_txm);
+
+ contexts = new Contexts();
+ contexts.scheduler = scheduler;
+ engine = new BpelEngineImpl(contexts);
+
+ myRoleMexImpl = new TestMyRoleMessageExchangeImpl();
+ }
+
+ class TestMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
+ public TestMyRoleMessageExchangeImpl() {
+ super(null, engine, (MessageExchangeDAO)mexDao.proxy());
+ }
+
+ public Map<String, ResponseCallback> callbacks() {
+ return _waitingCallbacks;
+ }
+ }
+}