Author: mszefler
Date: Mon Aug 6 17:05:59 2007
New Revision: 563350
URL: http://svn.apache.org/viewvc?view=rev&rev=563350
Log:
BART, some additinal refactorings. New model to fix concurrency problems in
Partner invokes.
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563350&r1=563349&r2=563350
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Mon Aug 6 17:05:59 2007
@@ -735,6 +735,7 @@
MessageDAO message =
mexDao.createMessage(operation.getInput().getMessage().getQName());
mexDao.setRequest(message);
+ mexDao.setTimeout(30000);
message.setData(outgoingMessage);
message.setType(operation.getInput().getMessage().getQName());
@@ -1011,7 +1012,7 @@
throw new NullPointerException("Null mexId");
if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Invoking message response for mexid " + mexid + " and
channel " + responseChannelId);
+ __log.debug("<invoke> response for mexid " + mexid + " and channel
" + responseChannelId);
}
_vpu.inject(new BpelJacobRunnable() {
private static final long serialVersionUID = -1095444335740879981L;
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=563350&r1=563349&r2=563350
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
Mon Aug 6 17:05:59 2007
@@ -47,7 +47,7 @@
* @author Maciej Szefler <mszefler at gmail dot com>
*/
class PartnerLinkPartnerRoleImpl extends PartnerLinkRoleImpl {
- static final Log __log = LogFactory.getLog(BpelProcess.class);
+ static final Log __log =
LogFactory.getLog(PartnerLinkPartnerRoleImpl.class);
Endpoint _initialPartner;
@@ -100,8 +100,15 @@
void invokeIL(MessageExchangeDAO mexDao) {
Element partnerEprXml = mexDao.getEPR();
- EndpointReference partnerEpr = partnerEprXml == null ? _initialEPR :
_contexts.eprContext
- .resolveEndpointReference(partnerEprXml);
+ EndpointReference partnerEpr = _initialEPR;
+ if (partnerEprXml != null) {
+ if (_contexts.eprContext != null)
+ partnerEpr =
_contexts.eprContext.resolveEndpointReference(partnerEprXml);
+ else
+ __log.warn("Partner EPR will not be resolved, no EPR context
specified!" );
+ }
+
+
EndpointReference myRoleEpr = null; // TODO: fix?
Operation operation =
_plinkDef.getPartnerRoleOperation(mexDao.getOperation());
Set<InvocationStyle> supportedStyles =
_contexts.mexContext.getSupportedInvocationStyle(_channel, partnerEpr);
@@ -158,6 +165,10 @@
*/
private void invokeInMem(MessageExchangeDAO mexDao, EndpointReference
partnerEpr, EndpointReference myRoleEpr,
Operation operation, Set<InvocationStyle> supportedStyles, boolean
oneway) {
+ if (__log.isDebugEnabled())
+ __log.debug("invokeInMem: mexid=" + mexDao.getMessageExchangeId()
+" operation=" + mexDao.getOperation() +" oneway=" + oneway);
+
+
// In-memory processes are a bit different, we're never going to do
any scheduling for them, so we'd
// prefer to have TRANSACTED invocation style.
if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
@@ -174,24 +185,26 @@
Transaction tx;
try {
tx = _contexts.txManager.suspend();
+ __log.debug("TX " + tx + " suspended for in-memory invoke. ");
} catch (Exception ex) {
throw new BpelEngineException("TxManager Error: cannot
suspend!", ex);
}
try {
unreliableMex.setState(State.INVOKE_XXX);
- _contexts.mexContext.invokePartnerBlocking(unreliableMex);
+ _contexts.mexContext.invokePartnerUnreliable(unreliableMex);
try {
unreliableMex.waitForAck(mexDao.getTimeout());
} catch (InterruptedException ie) {
- ;
- ; // ignore
+ __log.warn("Interrupted waiting for MEX response.");
+
}
} finally {
unreliableMex.setState(State.DEAD);
try {
_contexts.txManager.resume(tx);
+ __log.debug("TX " + tx + " resumed for in-memory invoke.
");
} catch (Exception e) {
throw new BpelEngineException("TxManager Error: cannot
resume!", e);
}
@@ -302,7 +315,7 @@
Status status;
_unreliableMex.setState(State.INVOKE_XXX);
try {
- _contexts.mexContext.invokePartnerBlocking(_unreliableMex);
+ _contexts.mexContext.invokePartnerUnreliable(_unreliableMex);
_unreliableMex.setState(State.HOLD);
} catch (Throwable t) {
_unreliableMex.setState(State.DEAD);
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563350&r1=563349&r2=563350
==============================================================================
---
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
(original)
+++
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
Mon Aug 6 17:05:59 2007
@@ -83,7 +83,7 @@
if (_done)
return getStatus();
- Future<Status> future = _future != null ? _future :
super.invokeAsync();
+ Future<Status> future = _future != null ? _future : invokeAsync();
try {
future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);