Author: mszefler
Date: Mon Aug  6 17:05:59 2007
New Revision: 563350

URL: http://svn.apache.org/viewvc?view=rev&rev=563350
Log:
BART, some additinal refactorings. New model to fix concurrency problems in 
Partner invokes.

Modified:
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
    
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563350&r1=563349&r2=563350
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Mon Aug  6 17:05:59 2007
@@ -735,6 +735,7 @@
 
         MessageDAO message = 
mexDao.createMessage(operation.getInput().getMessage().getQName());
         mexDao.setRequest(message);
+        mexDao.setTimeout(30000);
         message.setData(outgoingMessage);
         message.setType(operation.getInput().getMessage().getQName());
 
@@ -1011,7 +1012,7 @@
             throw new NullPointerException("Null mexId");
 
         if (BpelProcess.__log.isDebugEnabled()) {
-            __log.debug("Invoking message response for mexid " + mexid + " and 
channel " + responseChannelId);
+            __log.debug("<invoke> response for mexid " + mexid + " and channel 
" + responseChannelId);
         }
         _vpu.inject(new BpelJacobRunnable() {
             private static final long serialVersionUID = -1095444335740879981L;

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=563350&r1=563349&r2=563350
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
 Mon Aug  6 17:05:59 2007
@@ -47,7 +47,7 @@
  * @author Maciej Szefler <mszefler at gmail dot com>
  */
 class PartnerLinkPartnerRoleImpl extends PartnerLinkRoleImpl {
-    static final Log __log = LogFactory.getLog(BpelProcess.class);
+    static final Log __log = 
LogFactory.getLog(PartnerLinkPartnerRoleImpl.class);
 
     Endpoint _initialPartner;
 
@@ -100,8 +100,15 @@
     void invokeIL(MessageExchangeDAO mexDao) {
 
         Element partnerEprXml = mexDao.getEPR();
-        EndpointReference partnerEpr = partnerEprXml == null ? _initialEPR : 
_contexts.eprContext
-                .resolveEndpointReference(partnerEprXml);
+        EndpointReference partnerEpr = _initialEPR;
+        if (partnerEprXml != null) {
+            if (_contexts.eprContext != null)
+                partnerEpr = 
_contexts.eprContext.resolveEndpointReference(partnerEprXml);
+            else
+                __log.warn("Partner EPR will not be resolved, no EPR context 
specified!" );
+        }
+
+        
         EndpointReference myRoleEpr = null; // TODO: fix?
         Operation operation = 
_plinkDef.getPartnerRoleOperation(mexDao.getOperation());
         Set<InvocationStyle> supportedStyles = 
_contexts.mexContext.getSupportedInvocationStyle(_channel, partnerEpr);
@@ -158,6 +165,10 @@
      */
     private void invokeInMem(MessageExchangeDAO mexDao, EndpointReference 
partnerEpr, EndpointReference myRoleEpr,
             Operation operation, Set<InvocationStyle> supportedStyles, boolean 
oneway) {
+        if (__log.isDebugEnabled())
+            __log.debug("invokeInMem: mexid=" + mexDao.getMessageExchangeId() 
+" operation=" + mexDao.getOperation() +" oneway=" + oneway);
+
+        
         // In-memory processes are a bit different, we're never going to do 
any scheduling for them, so we'd
         // prefer to have TRANSACTED invocation style.
         if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
@@ -174,24 +185,26 @@
             Transaction tx;
             try {
                 tx = _contexts.txManager.suspend();
+                __log.debug("TX " + tx + " suspended for in-memory invoke. ");
             } catch (Exception ex) {
                 throw new BpelEngineException("TxManager Error: cannot 
suspend!", ex);
             }
 
             try {
                 unreliableMex.setState(State.INVOKE_XXX);
-                _contexts.mexContext.invokePartnerBlocking(unreliableMex);
+                _contexts.mexContext.invokePartnerUnreliable(unreliableMex);
                 try {
                     unreliableMex.waitForAck(mexDao.getTimeout());
                 } catch (InterruptedException ie) {
-                    ;
-                    ; // ignore
+                    __log.warn("Interrupted waiting for MEX response.");
+
                 }
 
             } finally {
                 unreliableMex.setState(State.DEAD);
                 try {
                     _contexts.txManager.resume(tx);
+                    __log.debug("TX " + tx + " resumed for in-memory invoke. 
");
                 } catch (Exception e) {
                     throw new BpelEngineException("TxManager Error: cannot 
resume!", e);
                 }
@@ -302,7 +315,7 @@
             Status status;
             _unreliableMex.setState(State.INVOKE_XXX);
             try {
-                _contexts.mexContext.invokePartnerBlocking(_unreliableMex);
+                _contexts.mexContext.invokePartnerUnreliable(_unreliableMex);
                 _unreliableMex.setState(State.HOLD);
             } catch (Throwable t) {
                 _unreliableMex.setState(State.DEAD);

Modified: 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563350&r1=563349&r2=563350
==============================================================================
--- 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
 (original)
+++ 
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
 Mon Aug  6 17:05:59 2007
@@ -83,7 +83,7 @@
         if (_done) 
             return getStatus();
 
-        Future<Status> future = _future != null ? _future : 
super.invokeAsync();
+        Future<Status> future = _future != null ? _future : invokeAsync();
         
         try {
             future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);


Reply via email to