Modified: 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=773939&r1=773938&r2=773939&view=diff
==============================================================================
--- 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Tue May 12 15:23:30 2009
@@ -21,10 +21,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 
 import javax.wsdl.Operation;
 import javax.xml.namespace.QName;
@@ -52,11 +49,7 @@
 import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent;
 import org.apache.ode.bpel.evt.ProcessTerminationEvent;
 import org.apache.ode.bpel.evt.ScopeEvent;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.MessageExchange.AckType;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
@@ -84,6 +77,10 @@
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
+import org.apache.ode.bpel.rapi.*;
+import org.apache.ode.bpel.rapi.Resource;
+
+import org.w3c.dom.*;
 
 class BpelRuntimeContextImpl implements OdeRTInstanceContext {
 
@@ -137,7 +134,7 @@
         return "{BpelRuntimeCtx PID=" + _bpelProcess.getPID() + ", IID=" + 
_iid + "}";
     }
 
-    public Long getPid() {
+    public Long getInstanceId() {
         return _iid;
     }
 
@@ -177,7 +174,7 @@
     public boolean isPartnerRoleEndpointInitialized(PartnerLink pLink) {
         PartnerLinkDAO spl = fetchPartnerLinkDAO(pLink);
 
-        return spl.getPartnerEPR() != null || 
_bpelProcess.getInitialPartnerRoleEPR(pLink.getModel()) != null;
+        return spl.getPartnerEPR() != null || 
((ODEWSProcess)_bpelProcess).getInitialPartnerRoleEPR(pLink.getModel()) != null;
     }
 
     public void completedFault(FaultInfo faultData) {
@@ -188,6 +185,8 @@
         _dao.setFault(faultData.getFaultName(), faultData.getExplanation(), 
faultData.getFaultLineNo(),
                 faultData.getActivityId(), faultData.getFaultMessage());
 
+        cleanupResourceRoutes();
+
         // send event
         ProcessInstanceStateChangeEvent evt = new 
ProcessInstanceStateChangeEvent();
         evt.setOldState(_dao.getState());
@@ -206,6 +205,8 @@
             ODEProcess.__log.debug("ProcessImpl " + _bpelProcess.getPID() + " 
completed OK.");
         }
 
+        cleanupResourceRoutes();
+
         // send event
         ProcessInstanceStateChangeEvent evt = new 
ProcessInstanceStateChangeEvent();
         evt.setOldState(_dao.getState());
@@ -249,6 +250,31 @@
         }
     }
 
+    public void initializeResource(Long parentScopeId, ResourceModel resource, 
String url) {
+        ScopeDAO parent = _dao.getScope(parentScopeId);
+        // Storing the resource as a variable
+        XmlDataDAO resourceData = parent.getVariable(resource.getName());
+        Document doc = DOMUtils.newDocument();
+        resourceData.set(doc.createTextNode(url));
+    }
+
+    public void initializeInstantiatingUrl(String url) {
+        _dao.setInstantiatingUrl(url);
+        _instantiatingMessageExchange.setResource(url + "~POST");
+    }
+
+    public String getInstantiatingUrl() {
+        return _dao.getInstantiatingUrl();
+    }
+
+    public String readResource(Long parentScopeId, ResourceModel resource) {
+        ScopeDAO parent = _dao.getScope(parentScopeId);
+        XmlDataDAO resourceData = parent.getVariable(resource.getName());
+        Node resourceNode = resourceData.get();
+        if (resourceData.isNull()) return null;
+        else return ((Text)resourceNode).getWholeText();
+    }
+
     public void select(String selectChannelId, Date timeout, Selector[] 
selectors) {
         if (ODEProcess.__log.isTraceEnabled())
             
ODEProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("select", new 
Object[] { "pickResponseChannel",
@@ -314,6 +340,39 @@
         }
     }
 
+    public void checkResourceRoute(Resource resourceInstance, String 
pickResponseChannel, int selectorIdx) {
+        // check if this is first pick
+        if (_dao.getState() == ProcessState.STATE_NEW) {
+            // send event
+            ProcessInstanceStateChangeEvent evt = new 
ProcessInstanceStateChangeEvent();
+            evt.setOldState(ProcessState.STATE_NEW);
+            _dao.setState(ProcessState.STATE_READY);
+            evt.setNewState(ProcessState.STATE_READY);
+            sendEvent(evt);
+        }
+
+        String method = resourceInstance.getModel().getMethod();
+        if (_instantiatingMessageExchange != null && method.equals("POST") && 
_dao.getState() == ProcessState.STATE_READY)
+            injectMyRoleMessageExchange(pickResponseChannel, selectorIdx, 
_instantiatingMessageExchange);
+        else {
+            String url = readResource(resourceInstance.getScopeInstanceId(), 
resourceInstance.getModel());
+            _dao.createResourceRoute(url, method, pickResponseChannel, 
selectorIdx);
+            org.apache.ode.bpel.iapi.Resource res = new 
org.apache.ode.bpel.iapi.Resource(url, "application/xml", method);
+            
_bpelProcess._contexts.bindingContext.activateProvidedResource(res);
+        }
+
+        // TODO schedule a matcher to see if the message arrived already
+    }
+
+    private void cleanupResourceRoutes() {
+        Set<String> routes = _dao.getAllResourceRoutes();
+        for (String route : routes) {
+            String[] resArr = route.split("~");
+            org.apache.ode.bpel.iapi.Resource res = new 
org.apache.ode.bpel.iapi.Resource(resArr[0], "application/xml", resArr[1]);
+            
_bpelProcess._contexts.bindingContext.deactivateProvidedResource(res);
+        }
+    }
+
     public CorrelationKey readCorrelation(CorrelationSet cset) {
         ScopeDAO scopeDAO = _dao.getScope(cset.getScopeId());
         CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.getName());
@@ -325,7 +384,7 @@
         PartnerLinkDAO pl = fetchPartnerLinkDAO(pLink);
         Element epr = pl.getPartnerEPR();
         if (epr == null) {
-            EndpointReference e = 
_bpelProcess.getInitialPartnerRoleEPR(pLink.getModel());
+            EndpointReference e = 
((ODEWSProcess)_bpelProcess).getInitialPartnerRoleEPR(pLink.getModel());
             if (e != null)
                 epr = e.toXML().getDocumentElement();
         }
@@ -334,7 +393,7 @@
     }
 
     public Element fetchMyRoleEndpointReferenceData(PartnerLink pLink) {
-        return 
_bpelProcess.getInitialMyRoleEPR(pLink.getModel()).toXML().getDocumentElement();
+        return 
((ODEWSProcess)_bpelProcess).getInitialMyRoleEPR(pLink.getModel()).toXML().getDocumentElement();
     }
 
     private PartnerLinkDAO fetchPartnerLinkDAO(PartnerLink pLink) {
@@ -417,7 +476,7 @@
         evt.setPortType(plink.getModel().getMyRolePortType().getQName());
 
         // Get the "my-role" mex from the DB.
-        MessageExchangeDAO myrolemex = 
_dao.getConnection().getMessageExchange(mexId);
+        MessageExchangeDAO myrolemex = getExistingMex(mexId);
 
         Operation operation = plink.getModel().getMyRoleOperation(opName);
         if (operation == null || operation.getOutput() == null) throw new 
NoSuchOperationException();
@@ -443,7 +502,7 @@
         myrolemex.setStatus(Status.ACK);
         myrolemex.setAckType(ackType);
         try {
-            _bpelProcess.onMyRoleMexAck(myrolemex, previousStatus);
+            ((ODEWSProcess)_bpelProcess).onMyRoleMexAck(myrolemex, 
previousStatus);
         } finally {
             if (myrolemex.getPipedMessageExchangeId() != null) {
                 
myrolemex.release(_bpelProcess.isCleanupCategoryEnabled(myrolemex.getAckType() 
== MessageExchange.AckType.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
@@ -452,6 +511,42 @@
         sendEvent(evt);
     }
 
+    public void reply(String mexId, Resource resource, Element msg, QName 
fault) throws NoSuchOperationException {
+        // prepare event
+        ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
+        evt.setMexId(mexId);
+        evt.setResource(resource.getName());
+
+        MessageExchangeDAO mex = getExistingMex(mexId);
+        MessageDAO message = mex.createMessage(null);
+        buildOutgoingMessage(message, msg);
+        mex.setResponse(message);
+
+        AckType ackType;
+        if (fault != null) {
+            ackType = AckType.FAULT;
+            mex.setFault(fault);
+            evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT);
+        } else {
+            ackType = AckType.RESPONSE;
+            evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
+        }
+
+        String url = readResource(resource.getScopeInstanceId(), 
resource.getModel());
+
+        Status previousStatus = mex.getStatus();
+        mex.setStatus(Status.ACK);
+        mex.setAckType(ackType);
+        try {
+            ((ODERESTProcess)_bpelProcess).onRestMexAck(mex, previousStatus, 
url);
+        } finally {
+            if (mex.getPipedMessageExchangeId() != null) {
+                
mex.release(_bpelProcess.isCleanupCategoryEnabled(mex.getAckType() == 
MessageExchange.AckType.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
+            }
+        }
+        sendEvent(evt);
+    }
+
     public void writeCorrelation(CorrelationSet cset, QName[] propNames, 
CorrelationKey correlation) throws FaultException {
         // enforce unique correlation set constraint
         ProcessDAO processDAO = _dao.getProcess();
@@ -494,7 +589,6 @@
     }
 
     private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey 
key) {
-
         WorkEvent we = new WorkEvent();
         we.setIID(_dao.getInstanceId());
         we.setProcessId(_bpelProcess.getPID());
@@ -507,7 +601,6 @@
     public String invoke(String requestId, PartnerLink partnerLink, Operation 
operation, Element outgoingMessage)
             throws UninitializedPartnerEPR {
 
-        // TODO: think we should move the dao creation into bpelprocess --mbs
         MessageExchangeDAO mexDao = 
_dao.getConnection().createMessageExchange(new GUID().toString(),
                 MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
         mexDao.setStatus(MessageExchange.Status.REQ);
@@ -515,7 +608,7 @@
         
mexDao.setPortType(partnerLink.getModel().getPartnerRolePortType().getQName());
         mexDao.setPartnerLinkModelId(partnerLink.getModel().getId());
 
-        PartnerRoleChannel partnerRoleChannel = 
_bpelProcess.getPartnerRoleChannel(partnerLink.getModel());
+        PartnerRoleChannel partnerRoleChannel = 
((ODEWSProcess)_bpelProcess).getPartnerRoleChannel(partnerLink.getModel());
         PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
 
         Element partnerEPR = plinkDAO.getPartnerEPR();
@@ -580,6 +673,52 @@
         return mexDao.getMessageExchangeId();
     }
 
+    public String invoke(String requestId, org.apache.ode.bpel.iapi.Resource 
resource, Element outgoingMessage) {
+
+        MessageExchangeDAO mexDao = 
_dao.getConnection().createMessageExchange(new GUID().toString(),
+                MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+        mexDao.setStatus(MessageExchange.Status.REQ);
+        mexDao.setResource(resource.getUrl() + "~" + resource.getMethod());
+        mexDao.setProcess(_dao.getProcess());
+        mexDao.setInstance(_dao);
+        mexDao.setPattern(MessageExchangePattern.REQUEST_RESPONSE);
+        mexDao.setChannel(requestId);
+
+        if (outgoingMessage != null) {
+            MessageDAO message = mexDao.createMessage(null);
+            mexDao.setRequest(message);
+            mexDao.setTimeout(30000);
+            message.setData(outgoingMessage);
+        }
+
+        // prepare event
+        ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
+        evt.setResource(resource.getUrl() + "~" + resource.getMethod());
+        evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
+        evt.setMexId(mexDao.getMessageExchangeId());
+        sendEvent(evt);
+
+        if (__log.isDebugEnabled())
+            __log.debug("INVOKING PARTNER: resource=" + resource + " channel=" 
+ requestId + ")");
+
+        _bpelProcess.invokePartner(mexDao);
+
+        // In case a response/fault was available right away, which will 
happen for BLOCKING/TRANSACTED invocations,
+        // we need to inject a message on the response channel, so that the 
process continues.
+        switch (mexDao.getStatus()) {
+        case ACK:
+            if (mexDao.getChannel() != null) 
injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+            break;
+        case ASYNC:
+            // we'll have to wait for the response.
+            break;
+        default:
+            throw new AssertionError("Unexpected MEX status: " + 
mexDao.getStatus());
+        }
+
+        return mexDao.getMessageExchangeId();
+    }
+
     private void buildOutgoingMessage(MessageDAO message, Element 
outgoingElmt) {
         if (outgoingElmt == null) return;
         
@@ -728,7 +867,7 @@
             __log.debug("<invoke> response for mexid " + mexid + " and channel 
" + invokeId);
         }
 
-        MessageExchangeDAO mex = 
_dao.getConnection().getMessageExchange(mexid);
+        MessageExchangeDAO mex = getExistingMex(mexid);
 
         ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
         evt.setPortType(mex.getPortType());
@@ -751,6 +890,12 @@
             irt = OdeRTInstance.InvokeResponseType.FAILURE;
             evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAILURE);
             break;
+        case ONEWAY:
+            // A ws-style one-way invoke won't even go there as there's no 
response channel, only used
+            // for rest style where you get a 204 after the fact.
+            irt = OdeRTInstance.InvokeResponseType.REPLY;
+            evt.setAspect(ProcessMessageExchangeEvent.PARTNER_OUTPUT);
+            break;
         default:
             String msg = "Invalid response state for mex " + mexid + ": " + 
status;
             __log.error(msg);
@@ -766,7 +911,7 @@
         event.setProcessId(_dao.getProcess().getProcessId());
         event.setProcessName(_dao.getProcess().getType());
         event.setProcessInstanceId(_dao.getInstanceId());
-        _bpelProcess._debugger.onEvent(event);
+        if (_bpelProcess._debugger != null) 
_bpelProcess._debugger.onEvent(event);
 
         //filter events
         List<String> scopeNames = null;
@@ -794,7 +939,7 @@
             }
             mexDao.setFaultExplanation("Process did not respond.");
             mexDao.setStatus(Status.ACK);
-            _bpelProcess.onMyRoleMexAck(mexDao, status);
+            ((ODEWSProcess)_bpelProcess).onMyRoleMexAck(mexDao, status);
         }
     }
 
@@ -803,18 +948,12 @@
     }
 
     public Element getMyRequest(String mexId) {
-        MessageExchangeDAO dao = 
_dao.getConnection().getMessageExchange(mexId);
-        if (dao == null) {
-            // this should not happen....
-            String msg = "Engine requested non-existent message exchange: " + 
mexId;
-            __log.fatal(msg);
-            throw new BpelEngineException(msg);
-        }
+        MessageExchangeDAO dao = getExistingMex(mexId);
 
         if (dao.getDirection() != 
MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE) {
             // this should not happen....
             String msg = "Engine requested my-role request for a partner-role 
mex: " + mexId;
-            __log.fatal(msg);
+            __log.error(msg);
             throw new BpelEngineException(msg);
         }
 
@@ -822,13 +961,23 @@
         if (request == null) {
             // this also should not happen
             String msg = "Engine requested request for message exchange that 
did not have one: " + mexId;
-            __log.fatal(msg);
+            __log.error(msg);
             throw new BpelEngineException(msg);
         }
 
         return mergeHeaders(request);
     }
 
+    public Map<String,String> getProperties(String mexId) {
+        MessageExchangeDAO dao = getExistingMex(mexId);
+        return dao.getProperties();
+    }
+
+    public void setInstantiatingMex(String mexId) {
+        MessageExchangeDAO mex = getExistingMex(mexId);
+        mex.setInstantiatingResource(true);
+    }
+
     private Element mergeHeaders(MessageDAO msg) {
         // Merging header data, it's all stored in the same variable
         Element data = msg.getData();
@@ -852,13 +1001,7 @@
     }
 
     public QName getPartnerFault(String mexId) {
-        MessageExchangeDAO dao = 
_dao.getConnection().getMessageExchange(mexId);
-        if (dao == null) {
-            // this should not happen....
-            String msg = "Engine requested non-existent message exchange: " + 
mexId;
-            __log.fatal(msg);
-            throw new BpelEngineException(msg);
-        }
+        MessageExchangeDAO dao = getExistingMex(mexId);
         return dao.getFault();
     }
 
@@ -872,13 +1015,7 @@
     }
 
     private MessageDAO _getPartnerResponse(String mexId) {
-        MessageExchangeDAO dao = 
_dao.getConnection().getMessageExchange(mexId);
-        if (dao == null) {
-            // this should not happen....
-            String msg = "Engine requested non-existent message exchange: " + 
mexId;
-            __log.fatal(msg);
-            throw new BpelEngineException(msg);
-        }
+        MessageExchangeDAO dao = getExistingMex(mexId);
         if (dao.getDirection() != 
MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE) {
             // this should not happen....
             String msg = "Engine requested partner response for a my-role mex: 
" + mexId;
@@ -912,22 +1049,22 @@
 
 
     public Element getSourceEPR(String mexId) {
-        MessageExchangeDAO dao = 
_dao.getConnection().getMessageExchange(mexId);
-        String epr = 
dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_EPR);
+        MessageExchangeDAO dao = getExistingMex(mexId);
+        String epr = 
dao.getProperty(WSMessageExchange.PROPERTY_SEP_PARTNERROLE_EPR);
         if (epr == null)
             return null;
         try {
             return DOMUtils.stringToDOM(epr);
         } catch (Exception ex) {
-            __log.error("Invalid value for SEP property " + 
MessageExchange.PROPERTY_SEP_PARTNERROLE_EPR + ": " + epr);
+            __log.error("Invalid value for SEP property " + 
WSMessageExchange.PROPERTY_SEP_PARTNERROLE_EPR + ": " + epr);
         }
 
         return null;
     }
 
     public String getSourceSessionId(String mexId) {
-        MessageExchangeDAO dao = 
_dao.getConnection().getMessageExchange(mexId);
-        return 
dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
+        MessageExchangeDAO dao = getExistingMex(mexId);
+        return 
dao.getProperty(WSMessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
     }
 
     public void registerActivityForRecovery(String channel, long activityId, 
String reason, Date dateTime,
@@ -1032,4 +1169,15 @@
     public Node getProcessProperty(QName propertyName) {
         return _bpelProcess.getProcessProperty(propertyName);
     }  
+
+    private MessageExchangeDAO getExistingMex(String mexId) {
+        MessageExchangeDAO dao = 
_dao.getConnection().getMessageExchange(mexId);
+        if (dao == null) {
+            // this should not happen....
+            String msg = "Engine requested non-existent message exchange: " + 
mexId;
+            __log.error(msg);
+            throw new BpelEngineException(msg);
+        }
+        return dao;
+    }
 }

Modified: 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=773939&r1=773938&r2=773939&view=diff
==============================================================================
--- 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java 
(original)
+++ 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java 
Tue May 12 15:23:30 2009
@@ -39,28 +39,11 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.BpelDAOConnection;
-import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.*;
 import org.apache.ode.bpel.evar.ExternalVariableModule;
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
-import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.BpelServer;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -70,6 +53,8 @@
 import org.apache.ode.utils.msg.MessageBundle;
 import org.apache.ode.utils.stl.CollectionsX;
 import org.apache.ode.utils.stl.MemberOfFunction;
+import org.apache.ode.bpel.rapi.ProcessModel;
+import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
 
 /**
  * <p>
@@ -107,10 +92,12 @@
     private final HashMap<QName, ODEProcess> _registeredProcesses = new 
HashMap<QName, ODEProcess>();
 
     /** Mapping from myrole service name to active process. */
-    private final HashMap<QName, List<ODEProcess>> _serviceMap = new 
HashMap<QName, List<ODEProcess>>();
+    private final HashMap<QName, List<ODEProcess>> _wsServiceMap = new 
HashMap<QName, List<ODEProcess>>();
+
+    private final HashMap<String, ODERESTProcess> _restServiceMap = new 
HashMap<String, ODERESTProcess>();
 
     /** Weak-reference cache of all the my-role message exchange objects. */
-    private final MyRoleMessageExchangeCache _myRoleMexCache = new 
MyRoleMessageExchangeCache();
+    private final IncomingMessageExchangeCache _incomingMexCache = new 
IncomingMessageExchangeCache();
 
     private State _state = State.SHUTDOWN;
 
@@ -136,7 +123,7 @@
     private final AtomicLong _lastTimeOfServerCallable = new 
AtomicLong(System.currentTimeMillis());
     
     /** Mapping from a potentially shared endpoint to its EPR */ 
-    private SharedEndpoints _sharedEps;        
+    private SharedEndpoints _sharedEps;
 
     static {
         // TODO Clean this up and factorize engine configuration
@@ -144,7 +131,7 @@
             String processMaxAge = System.getProperty("ode.process.maxage");
             if (processMaxAge != null && processMaxAge.length() > 0) {
                 __processMaxAge = Long.valueOf(processMaxAge);
-                __log.info("Process definition max age adjusted. Max age = " + 
__processMaxAge + "ms.");
+                __log.debug("Process definition max age adjusted. Max age = " 
+ __processMaxAge + "ms.");
             }
         } catch (Throwable t) {
             if (__log.isDebugEnabled()) {
@@ -223,7 +210,7 @@
             
             _contexts.scheduler.start();
             _state = State.RUNNING;
-            __log.info(__msgs.msgServerStarted());
+            __log.debug(__msgs.msgServerStarted());
             if (_dehydrationPolicy != null)
                 new Thread(new ProcessDefReaper()).start();
         } finally {
@@ -292,7 +279,7 @@
 
             _contexts.scheduler.stop();
             _state = State.INIT;
-            __log.info(__msgs.msgServerStopped());
+            __log.debug(__msgs.msgServerStopped());
         } finally {
             _mngmtLock.writeLock().unlock();
         }
@@ -337,8 +324,7 @@
 
         __log.debug("register: " + conf.getProcessId());
 
-        // Ok, IO out of the way, we will mod the server state, so need to get 
a
-        // lock.
+        // Ok, IO out of the way, we will mod the server state, so need to get 
a lock.
         try {
             _mngmtLock.writeLock().lockInterruptibly();
         } catch (InterruptedException ie) {
@@ -355,34 +341,41 @@
 
             __log.debug("Registering process " + conf.getProcessId() + " with 
server.");
 
-            ODEProcess process = new ODEProcess(this, conf, null, 
_myRoleMexCache);
-
-            for (Endpoint e : process.getServiceNames()) {
-                __log.debug("Register process: serviceId=" + e + ", process=" 
+ process);
-                // Get the list of processes associated with the given service
-                List<ODEProcess> processes = _serviceMap.get(e.serviceName);
-                if (processes == null) {
-                    // Create an empty list, if no processes were associated
-                    _serviceMap.put(e.serviceName, processes = new 
ArrayList<ODEProcess>());
+            ODEProcess process;
+            if (conf.isRestful()) {
+                ODERESTProcess restProcess = new ODERESTProcess(this, conf, 
null, _incomingMexCache);
+                for (String resUrl : restProcess.initResources()) {
+                    _restServiceMap.put(resUrl, restProcess);
                 }
-                // Remove any older version of the process from the list
-                for (int i = 0; i < processes.size(); i++) {
-                    ODEProcess cachedVersion = processes.get(i);
-                    __log.debug("cached version " + cachedVersion.getPID() + " 
vs registering version " + process.getPID());
-                    if 
(cachedVersion.getProcessType().equals(process.getProcessType())) {
-                        processes.remove(cachedVersion);
+                process = restProcess;
+            } else {
+                ODEWSProcess wsProcess = new ODEWSProcess(this, conf, null, 
_incomingMexCache);
+                for (Endpoint e : wsProcess.getServiceNames()) {
+                    __log.debug("Register process: serviceId=" + e + ", 
process=" + wsProcess);
+                    // Get the list of processes associated with the given 
service
+                    List<ODEProcess> processes = 
_wsServiceMap.get(e.serviceName);
+                    // Create an empty list, if no processes were associated
+                    if (processes == null)
+                        _wsServiceMap.put(e.serviceName, processes = new 
ArrayList<ODEProcess>());
+
+                    // Remove any older version of the process from the list
+                    for (int i = 0; i < processes.size(); i++) {
+                        ODEProcess cachedVersion = processes.get(i);
+                        __log.debug("cached version " + cachedVersion.getPID() 
+ " vs registering version " + wsProcess.getPID());
+                        if 
(cachedVersion.getProcessType().equals(wsProcess.getProcessType()))
+                            processes.remove(cachedVersion);
                     }
+                    // Add the given process to the list associated with the 
given service
+                    processes.add(wsProcess);
                 }
-                // Add the given process to the list associated with the given 
service
-                processes.add(process);
+                process = wsProcess;
             }
 
             process.activate(_contexts);
-
             _registeredProcesses.put(process.getPID(), process);
             if (_dehydrationPolicy == null) process.hydrate();
 
-            __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
+            __log.debug(__msgs.msgProcessRegistered(conf.getProcessId()));
         } finally {
             _mngmtLock.writeLock().unlock();
         }
@@ -410,12 +403,12 @@
             
             // Remove the process from any services that might reference it.
             // However, don't remove the service itself from the map.
-            for (List<ODEProcess> processes : _serviceMap.values()) {
+            for (List<ODEProcess> processes : _wsServiceMap.values()) {
                 __log.debug("removing process " + pid + "; handle " + p + "; 
exists " + processes.contains(p));
                 processes.remove(p);
             }
 
-            __log.info(__msgs.msgProcessUnregistered(pid));
+            __log.debug(__msgs.msgProcessUnregistered(pid));
 
         } catch (Exception ex) {
             __log.error(__msgs.msgProcessUnregisterFailed(pid), ex);
@@ -463,7 +456,7 @@
         // one service is listening on the same endpoint.
         _mngmtLock.readLock().lock();
         try {
-            return _serviceMap.get(service);
+            return _wsServiceMap.get(service);
         } finally {
             _mngmtLock.readLock().unlock();
         }
@@ -514,7 +507,7 @@
                     public Void call() throws Exception {
                         _contexts.scheduler.jobCompleted(jobInfo.jobName);
                         Date future = new Date(System.currentTimeMillis() + 
(60 * 1000));
-                        
__log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), 
jobInfo.jobName, future));
+                        
__log.debug(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), 
jobInfo.jobName, future));
                         
_contexts.scheduler.schedulePersistedJob(we.getDetail(), future);            
                         return null;
                     }
@@ -621,7 +614,38 @@
             _mngmtLock.readLock().unlock();
         }
     }
-    
+
+    public RESTInMessageExchange createMessageExchange(final Resource 
resource, String foreignKey) throws BpelEngineException {
+        _mngmtLock.readLock().lock();
+        try {
+            ODERESTProcess target = _restServiceMap.get(resource.getUrl());
+            if (target == null) {
+                try {
+                    QName processId = _contexts.execTransaction(new 
Callable<QName>() {
+                        public QName call() {
+                            ResourceRouteDAO rr = _contexts.dao.getConnection()
+                                    .getResourceRoute(resource.getUrl(), 
resource.getMethod());
+                            if (rr == null) return null;
+                            ProcessDAO processDao = 
rr.getInstance().getProcess();
+                            return processDao.getProcessId();
+                        }
+                    });
+                    for (ODEProcess odeRestProcess : 
_registeredProcesses.values()) {
+                        if (odeRestProcess._pid.equals(processId)) target = 
(ODERESTProcess)odeRestProcess;
+                    }
+                } catch (Exception e) {
+                    throw new BpelEngineException(e);
+                }
+            }
+
+            if (target == null) throw new BpelEngineException("No such 
resource: " + resource.getUrl());
+            assertNoTransaction();
+            return target.createRESTMessageExchange(resource, foreignKey);
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
     /**
      * Return a simple type of MEX for a given process target
      * @param process
@@ -638,7 +662,7 @@
         else
             assertNoTransaction();
     
-        return process.createNewMyRoleMex(istyle, targetService, operation);
+           return ((ODEWSProcess)process).createNewMyRoleMex(istyle, 
targetService, operation);
     }
     
     /**
@@ -698,9 +722,9 @@
 
                     switch (mexdao.getDirection()) {
                     case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
-                        return process.createPartnerRoleMex(mexdao);
+                        return 
((ODEWSProcess)process).createPartnerRoleMex(mexdao);
                     case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
-                        return process.lookupMyRoleMex(mexdao);
+                        return ((ODEWSProcess)process).lookupMyRoleMex(mexdao);
                     default:
                         String errmsg = "BpelEngineImpl: internal error, 
invalid MexDAO direction: " + mexId;
                         __log.fatal(errmsg);
@@ -811,15 +835,12 @@
     void scheduleRunnable(final Runnable runnable) {
         assertTransaction();
         _contexts.registerCommitSynchronizer(new Runnable() {
-
             public void run() {
                 _exec.submit(new ServerRunnable(runnable));
             }
             
         });
-        
     }
-
     
     protected void assertTransaction() {
         if (!_contexts.isTransacted())
@@ -893,7 +914,7 @@
                     }
                 }
             } catch (InterruptedException e) {
-                __log.info(e);
+                __log.debug(e);
             }
         }
     }
@@ -911,7 +932,6 @@
         _lastTimeOfServerCallable.set(System.currentTimeMillis());
         
     }
-   
     
     class ServerRunnable implements Runnable {
         final Runnable _work;

Modified: 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java?rev=773939&r1=773938&r2=773939&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java 
(original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue 
May 12 15:23:30 2009
@@ -81,12 +81,10 @@
     public void execTransaction(final Runnable transaction) {
         try {
             execTransaction(new Callable<Void>() {
-
                 public Void call() throws Exception {
                     transaction.run();
                     return null;
                 }
-
             });
         } catch (Exception e) {
             throw new BpelEngineException(e);

Added: 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java?rev=773939&view=auto
==============================================================================
--- 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java
 (added)
+++ 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java
 Tue May 12 15:23:30 2009
@@ -0,0 +1,61 @@
+package org.apache.ode.bpel.engine;
+
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+
+/**
+ * Manage {...@link IncomingMessageExchangeCache} object references.
+ */
+class IncomingMessageExchangeCache {
+
+    private static final int CLEANUP_PERIOD = 20;
+
+    private Map<String, WeakReference<MessageExchangeImpl>> _cache = new 
ConcurrentHashMap<String, WeakReference<MessageExchangeImpl>>();
+
+    private int _inserts = 0;
+
+    void put(MessageExchangeImpl mex) {
+        ++_inserts;
+        if (_inserts > CLEANUP_PERIOD) cleanup();
+
+        WeakReference<MessageExchangeImpl> ref = 
_cache.get(mex.getMessageExchangeId());
+        if (ref != null && ref.get() != null)
+            throw new IllegalStateException("InternalError: duplicate 
myrolemex registration!");
+
+        _cache.put(mex.getMessageExchangeId(), new 
WeakReference<MessageExchangeImpl>(mex));
+    }
+
+    /**
+     * Retrieve a {...@link MyRoleMessageExchangeImpl} from the cache, 
re-creating if necessary.
+     * 
+     * @param mexdao
+     * @return
+     */
+    MessageExchangeImpl get(MessageExchangeDAO mexdao, ODEProcess process) {
+        WeakReference<MessageExchangeImpl> ref = 
_cache.get(mexdao.getMessageExchangeId());
+        MessageExchangeImpl mex = ref == null ? null : ref.get();
+
+        if (mex == null) {
+            mex = process.recreateIncomingMex(mexdao);
+            _cache.put(mexdao.getMessageExchangeId(), new 
WeakReference<MessageExchangeImpl>(mex));
+        }
+        return mex;
+    }
+
+    /**
+     * Remove stale references.
+     * 
+     */
+    private void cleanup() {
+        for (Iterator<WeakReference<MessageExchangeImpl>> i = 
_cache.values().iterator(); i.hasNext();) {
+            WeakReference<MessageExchangeImpl> ref = i.next();
+            if (ref.get() == null) i.remove();
+        }
+        _inserts = 0;
+    }
+}

Modified: 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?rev=773939&r1=773938&r2=773939&view=diff
==============================================================================
--- 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 (original)
+++ 
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
 Tue May 12 15:23:30 2009
@@ -22,7 +22,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.*;
 
 import javax.wsdl.Operation;
 import javax.wsdl.PortType;
@@ -159,8 +159,8 @@
     }
 
     void save(MessageExchangeDAO dao) {
-        dao.setPartnerLinkModelId(_oplink.getId());
-        dao.setOperation(_operation.getName());
+        if (_oplink != null) dao.setPartnerLinkModelId(_oplink.getId());
+        if (_operation != null) dao.setOperation(_operation.getName());
         dao.setStatus(_status);
         dao.setInvocationStyle(getInvocationStyle());
         dao.setFault(_fault);
@@ -413,5 +413,51 @@
     void sync() {
         ++_syncdummy;
     }
+
+    protected static class ResponseFuture implements Future<Status> {
+        private Status _status;
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        public Status get() throws InterruptedException, ExecutionException {
+            try {
+                return get(0, TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                // If it's thrown it's definitely a bug
+                throw new RuntimeException(e);
+            }
+        }
+
+        public Status get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+            synchronized (this) {
+                if (_status != null)
+                    return _status;
+
+                this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+
+                if (_status == null) throw new TimeoutException();
+                return _status;
+            }
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return _status != null;
+        }
+
+        void done(Status status) {
+            synchronized (this) {
+                _status = status;
+                this.notifyAll();
+            }
+        }
+    }
+
+
 }
 


Reply via email to