Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=773939&r1=773938&r2=773939&view=diff ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue May 12 15:23:30 2009 @@ -21,10 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; +import java.util.*; import javax.wsdl.Operation; import javax.xml.namespace.QName; @@ -52,11 +49,7 @@ import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent; import org.apache.ode.bpel.evt.ProcessTerminationEvent; import org.apache.ode.bpel.evt.ScopeEvent; -import org.apache.ode.bpel.iapi.BpelEngineException; -import org.apache.ode.bpel.iapi.ContextException; -import org.apache.ode.bpel.iapi.EndpointReference; -import org.apache.ode.bpel.iapi.MessageExchange; -import org.apache.ode.bpel.iapi.PartnerRoleChannel; +import org.apache.ode.bpel.iapi.*; import org.apache.ode.bpel.iapi.MessageExchange.AckType; import org.apache.ode.bpel.iapi.MessageExchange.FailureType; import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern; @@ -84,6 +77,10 @@ import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; +import org.apache.ode.bpel.rapi.*; +import org.apache.ode.bpel.rapi.Resource; + +import org.w3c.dom.*; class BpelRuntimeContextImpl implements OdeRTInstanceContext { @@ -137,7 +134,7 @@ return "{BpelRuntimeCtx PID=" + _bpelProcess.getPID() + ", IID=" + _iid + "}"; } - public Long getPid() { + public Long getInstanceId() { return _iid; } @@ -177,7 +174,7 @@ public boolean isPartnerRoleEndpointInitialized(PartnerLink pLink) { PartnerLinkDAO spl = fetchPartnerLinkDAO(pLink); - return spl.getPartnerEPR() != null || _bpelProcess.getInitialPartnerRoleEPR(pLink.getModel()) != null; + return spl.getPartnerEPR() != null || ((ODEWSProcess)_bpelProcess).getInitialPartnerRoleEPR(pLink.getModel()) != null; } public void completedFault(FaultInfo faultData) { @@ -188,6 +185,8 @@ _dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(), faultData.getActivityId(), faultData.getFaultMessage()); + cleanupResourceRoutes(); + // send event ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent(); evt.setOldState(_dao.getState()); @@ -206,6 +205,8 @@ ODEProcess.__log.debug("ProcessImpl " + _bpelProcess.getPID() + " completed OK."); } + cleanupResourceRoutes(); + // send event ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent(); evt.setOldState(_dao.getState()); @@ -249,6 +250,31 @@ } } + public void initializeResource(Long parentScopeId, ResourceModel resource, String url) { + ScopeDAO parent = _dao.getScope(parentScopeId); + // Storing the resource as a variable + XmlDataDAO resourceData = parent.getVariable(resource.getName()); + Document doc = DOMUtils.newDocument(); + resourceData.set(doc.createTextNode(url)); + } + + public void initializeInstantiatingUrl(String url) { + _dao.setInstantiatingUrl(url); + _instantiatingMessageExchange.setResource(url + "~POST"); + } + + public String getInstantiatingUrl() { + return _dao.getInstantiatingUrl(); + } + + public String readResource(Long parentScopeId, ResourceModel resource) { + ScopeDAO parent = _dao.getScope(parentScopeId); + XmlDataDAO resourceData = parent.getVariable(resource.getName()); + Node resourceNode = resourceData.get(); + if (resourceData.isNull()) return null; + else return ((Text)resourceNode).getWholeText(); + } + public void select(String selectChannelId, Date timeout, Selector[] selectors) { if (ODEProcess.__log.isTraceEnabled()) ODEProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("select", new Object[] { "pickResponseChannel", @@ -314,6 +340,39 @@ } } + public void checkResourceRoute(Resource resourceInstance, String pickResponseChannel, int selectorIdx) { + // check if this is first pick + if (_dao.getState() == ProcessState.STATE_NEW) { + // send event + ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent(); + evt.setOldState(ProcessState.STATE_NEW); + _dao.setState(ProcessState.STATE_READY); + evt.setNewState(ProcessState.STATE_READY); + sendEvent(evt); + } + + String method = resourceInstance.getModel().getMethod(); + if (_instantiatingMessageExchange != null && method.equals("POST") && _dao.getState() == ProcessState.STATE_READY) + injectMyRoleMessageExchange(pickResponseChannel, selectorIdx, _instantiatingMessageExchange); + else { + String url = readResource(resourceInstance.getScopeInstanceId(), resourceInstance.getModel()); + _dao.createResourceRoute(url, method, pickResponseChannel, selectorIdx); + org.apache.ode.bpel.iapi.Resource res = new org.apache.ode.bpel.iapi.Resource(url, "application/xml", method); + _bpelProcess._contexts.bindingContext.activateProvidedResource(res); + } + + // TODO schedule a matcher to see if the message arrived already + } + + private void cleanupResourceRoutes() { + Set<String> routes = _dao.getAllResourceRoutes(); + for (String route : routes) { + String[] resArr = route.split("~"); + org.apache.ode.bpel.iapi.Resource res = new org.apache.ode.bpel.iapi.Resource(resArr[0], "application/xml", resArr[1]); + _bpelProcess._contexts.bindingContext.deactivateProvidedResource(res); + } + } + public CorrelationKey readCorrelation(CorrelationSet cset) { ScopeDAO scopeDAO = _dao.getScope(cset.getScopeId()); CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.getName()); @@ -325,7 +384,7 @@ PartnerLinkDAO pl = fetchPartnerLinkDAO(pLink); Element epr = pl.getPartnerEPR(); if (epr == null) { - EndpointReference e = _bpelProcess.getInitialPartnerRoleEPR(pLink.getModel()); + EndpointReference e = ((ODEWSProcess)_bpelProcess).getInitialPartnerRoleEPR(pLink.getModel()); if (e != null) epr = e.toXML().getDocumentElement(); } @@ -334,7 +393,7 @@ } public Element fetchMyRoleEndpointReferenceData(PartnerLink pLink) { - return _bpelProcess.getInitialMyRoleEPR(pLink.getModel()).toXML().getDocumentElement(); + return ((ODEWSProcess)_bpelProcess).getInitialMyRoleEPR(pLink.getModel()).toXML().getDocumentElement(); } private PartnerLinkDAO fetchPartnerLinkDAO(PartnerLink pLink) { @@ -417,7 +476,7 @@ evt.setPortType(plink.getModel().getMyRolePortType().getQName()); // Get the "my-role" mex from the DB. - MessageExchangeDAO myrolemex = _dao.getConnection().getMessageExchange(mexId); + MessageExchangeDAO myrolemex = getExistingMex(mexId); Operation operation = plink.getModel().getMyRoleOperation(opName); if (operation == null || operation.getOutput() == null) throw new NoSuchOperationException(); @@ -443,7 +502,7 @@ myrolemex.setStatus(Status.ACK); myrolemex.setAckType(ackType); try { - _bpelProcess.onMyRoleMexAck(myrolemex, previousStatus); + ((ODEWSProcess)_bpelProcess).onMyRoleMexAck(myrolemex, previousStatus); } finally { if (myrolemex.getPipedMessageExchangeId() != null) { myrolemex.release(_bpelProcess.isCleanupCategoryEnabled(myrolemex.getAckType() == MessageExchange.AckType.RESPONSE, CLEANUP_CATEGORY.MESSAGES)); @@ -452,6 +511,42 @@ sendEvent(evt); } + public void reply(String mexId, Resource resource, Element msg, QName fault) throws NoSuchOperationException { + // prepare event + ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent(); + evt.setMexId(mexId); + evt.setResource(resource.getName()); + + MessageExchangeDAO mex = getExistingMex(mexId); + MessageDAO message = mex.createMessage(null); + buildOutgoingMessage(message, msg); + mex.setResponse(message); + + AckType ackType; + if (fault != null) { + ackType = AckType.FAULT; + mex.setFault(fault); + evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT); + } else { + ackType = AckType.RESPONSE; + evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT); + } + + String url = readResource(resource.getScopeInstanceId(), resource.getModel()); + + Status previousStatus = mex.getStatus(); + mex.setStatus(Status.ACK); + mex.setAckType(ackType); + try { + ((ODERESTProcess)_bpelProcess).onRestMexAck(mex, previousStatus, url); + } finally { + if (mex.getPipedMessageExchangeId() != null) { + mex.release(_bpelProcess.isCleanupCategoryEnabled(mex.getAckType() == MessageExchange.AckType.RESPONSE, CLEANUP_CATEGORY.MESSAGES)); + } + } + sendEvent(evt); + } + public void writeCorrelation(CorrelationSet cset, QName[] propNames, CorrelationKey correlation) throws FaultException { // enforce unique correlation set constraint ProcessDAO processDAO = _dao.getProcess(); @@ -494,7 +589,6 @@ } private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey key) { - WorkEvent we = new WorkEvent(); we.setIID(_dao.getInstanceId()); we.setProcessId(_bpelProcess.getPID()); @@ -507,7 +601,6 @@ public String invoke(String requestId, PartnerLink partnerLink, Operation operation, Element outgoingMessage) throws UninitializedPartnerEPR { - // TODO: think we should move the dao creation into bpelprocess --mbs MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(), MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE); mexDao.setStatus(MessageExchange.Status.REQ); @@ -515,7 +608,7 @@ mexDao.setPortType(partnerLink.getModel().getPartnerRolePortType().getQName()); mexDao.setPartnerLinkModelId(partnerLink.getModel().getId()); - PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.getModel()); + PartnerRoleChannel partnerRoleChannel = ((ODEWSProcess)_bpelProcess).getPartnerRoleChannel(partnerLink.getModel()); PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink); Element partnerEPR = plinkDAO.getPartnerEPR(); @@ -580,6 +673,52 @@ return mexDao.getMessageExchangeId(); } + public String invoke(String requestId, org.apache.ode.bpel.iapi.Resource resource, Element outgoingMessage) { + + MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(), + MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE); + mexDao.setStatus(MessageExchange.Status.REQ); + mexDao.setResource(resource.getUrl() + "~" + resource.getMethod()); + mexDao.setProcess(_dao.getProcess()); + mexDao.setInstance(_dao); + mexDao.setPattern(MessageExchangePattern.REQUEST_RESPONSE); + mexDao.setChannel(requestId); + + if (outgoingMessage != null) { + MessageDAO message = mexDao.createMessage(null); + mexDao.setRequest(message); + mexDao.setTimeout(30000); + message.setData(outgoingMessage); + } + + // prepare event + ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent(); + evt.setResource(resource.getUrl() + "~" + resource.getMethod()); + evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT); + evt.setMexId(mexDao.getMessageExchangeId()); + sendEvent(evt); + + if (__log.isDebugEnabled()) + __log.debug("INVOKING PARTNER: resource=" + resource + " channel=" + requestId + ")"); + + _bpelProcess.invokePartner(mexDao); + + // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations, + // we need to inject a message on the response channel, so that the process continues. + switch (mexDao.getStatus()) { + case ACK: + if (mexDao.getChannel() != null) injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel()); + break; + case ASYNC: + // we'll have to wait for the response. + break; + default: + throw new AssertionError("Unexpected MEX status: " + mexDao.getStatus()); + } + + return mexDao.getMessageExchangeId(); + } + private void buildOutgoingMessage(MessageDAO message, Element outgoingElmt) { if (outgoingElmt == null) return; @@ -728,7 +867,7 @@ __log.debug("<invoke> response for mexid " + mexid + " and channel " + invokeId); } - MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexid); + MessageExchangeDAO mex = getExistingMex(mexid); ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent(); evt.setPortType(mex.getPortType()); @@ -751,6 +890,12 @@ irt = OdeRTInstance.InvokeResponseType.FAILURE; evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAILURE); break; + case ONEWAY: + // A ws-style one-way invoke won't even go there as there's no response channel, only used + // for rest style where you get a 204 after the fact. + irt = OdeRTInstance.InvokeResponseType.REPLY; + evt.setAspect(ProcessMessageExchangeEvent.PARTNER_OUTPUT); + break; default: String msg = "Invalid response state for mex " + mexid + ": " + status; __log.error(msg); @@ -766,7 +911,7 @@ event.setProcessId(_dao.getProcess().getProcessId()); event.setProcessName(_dao.getProcess().getType()); event.setProcessInstanceId(_dao.getInstanceId()); - _bpelProcess._debugger.onEvent(event); + if (_bpelProcess._debugger != null) _bpelProcess._debugger.onEvent(event); //filter events List<String> scopeNames = null; @@ -794,7 +939,7 @@ } mexDao.setFaultExplanation("Process did not respond."); mexDao.setStatus(Status.ACK); - _bpelProcess.onMyRoleMexAck(mexDao, status); + ((ODEWSProcess)_bpelProcess).onMyRoleMexAck(mexDao, status); } } @@ -803,18 +948,12 @@ } public Element getMyRequest(String mexId) { - MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId); - if (dao == null) { - // this should not happen.... - String msg = "Engine requested non-existent message exchange: " + mexId; - __log.fatal(msg); - throw new BpelEngineException(msg); - } + MessageExchangeDAO dao = getExistingMex(mexId); if (dao.getDirection() != MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE) { // this should not happen.... String msg = "Engine requested my-role request for a partner-role mex: " + mexId; - __log.fatal(msg); + __log.error(msg); throw new BpelEngineException(msg); } @@ -822,13 +961,23 @@ if (request == null) { // this also should not happen String msg = "Engine requested request for message exchange that did not have one: " + mexId; - __log.fatal(msg); + __log.error(msg); throw new BpelEngineException(msg); } return mergeHeaders(request); } + public Map<String,String> getProperties(String mexId) { + MessageExchangeDAO dao = getExistingMex(mexId); + return dao.getProperties(); + } + + public void setInstantiatingMex(String mexId) { + MessageExchangeDAO mex = getExistingMex(mexId); + mex.setInstantiatingResource(true); + } + private Element mergeHeaders(MessageDAO msg) { // Merging header data, it's all stored in the same variable Element data = msg.getData(); @@ -852,13 +1001,7 @@ } public QName getPartnerFault(String mexId) { - MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId); - if (dao == null) { - // this should not happen.... - String msg = "Engine requested non-existent message exchange: " + mexId; - __log.fatal(msg); - throw new BpelEngineException(msg); - } + MessageExchangeDAO dao = getExistingMex(mexId); return dao.getFault(); } @@ -872,13 +1015,7 @@ } private MessageDAO _getPartnerResponse(String mexId) { - MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId); - if (dao == null) { - // this should not happen.... - String msg = "Engine requested non-existent message exchange: " + mexId; - __log.fatal(msg); - throw new BpelEngineException(msg); - } + MessageExchangeDAO dao = getExistingMex(mexId); if (dao.getDirection() != MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE) { // this should not happen.... String msg = "Engine requested partner response for a my-role mex: " + mexId; @@ -912,22 +1049,22 @@ public Element getSourceEPR(String mexId) { - MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId); - String epr = dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_EPR); + MessageExchangeDAO dao = getExistingMex(mexId); + String epr = dao.getProperty(WSMessageExchange.PROPERTY_SEP_PARTNERROLE_EPR); if (epr == null) return null; try { return DOMUtils.stringToDOM(epr); } catch (Exception ex) { - __log.error("Invalid value for SEP property " + MessageExchange.PROPERTY_SEP_PARTNERROLE_EPR + ": " + epr); + __log.error("Invalid value for SEP property " + WSMessageExchange.PROPERTY_SEP_PARTNERROLE_EPR + ": " + epr); } return null; } public String getSourceSessionId(String mexId) { - MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId); - return dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID); + MessageExchangeDAO dao = getExistingMex(mexId); + return dao.getProperty(WSMessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID); } public void registerActivityForRecovery(String channel, long activityId, String reason, Date dateTime, @@ -1032,4 +1169,15 @@ public Node getProcessProperty(QName propertyName) { return _bpelProcess.getProcessProperty(propertyName); } + + private MessageExchangeDAO getExistingMex(String mexId) { + MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId); + if (dao == null) { + // this should not happen.... + String msg = "Engine requested non-existent message exchange: " + mexId; + __log.error(msg); + throw new BpelEngineException(msg); + } + return dao; + } }
Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=773939&r1=773938&r2=773939&view=diff ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue May 12 15:23:30 2009 @@ -39,28 +39,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.bpel.dao.BpelDAOConnection; -import org.apache.ode.bpel.dao.BpelDAOConnectionFactory; -import org.apache.ode.bpel.dao.MessageExchangeDAO; -import org.apache.ode.bpel.dao.ProcessDAO; +import org.apache.ode.bpel.dao.*; import org.apache.ode.bpel.evar.ExternalVariableModule; import org.apache.ode.bpel.evt.BpelEvent; import org.apache.ode.bpel.extension.ExtensionBundleRuntime; -import org.apache.ode.bpel.iapi.BindingContext; -import org.apache.ode.bpel.iapi.BpelEngineException; -import org.apache.ode.bpel.iapi.BpelEventListener; -import org.apache.ode.bpel.iapi.BpelServer; -import org.apache.ode.bpel.iapi.ContextException; -import org.apache.ode.bpel.iapi.Endpoint; -import org.apache.ode.bpel.iapi.EndpointReferenceContext; -import org.apache.ode.bpel.iapi.InvocationStyle; -import org.apache.ode.bpel.iapi.Message; -import org.apache.ode.bpel.iapi.MessageExchange; -import org.apache.ode.bpel.iapi.MessageExchangeContext; -import org.apache.ode.bpel.iapi.MyRoleMessageExchange; -import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; -import org.apache.ode.bpel.iapi.ProcessConf; -import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.iapi.*; import org.apache.ode.bpel.iapi.Scheduler.JobInfo; import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException; import org.apache.ode.bpel.intercept.MessageExchangeInterceptor; @@ -70,6 +53,8 @@ import org.apache.ode.utils.msg.MessageBundle; import org.apache.ode.utils.stl.CollectionsX; import org.apache.ode.utils.stl.MemberOfFunction; +import org.apache.ode.bpel.rapi.ProcessModel; +import org.apache.ode.bpel.extension.ExtensionBundleRuntime; /** * <p> @@ -107,10 +92,12 @@ private final HashMap<QName, ODEProcess> _registeredProcesses = new HashMap<QName, ODEProcess>(); /** Mapping from myrole service name to active process. */ - private final HashMap<QName, List<ODEProcess>> _serviceMap = new HashMap<QName, List<ODEProcess>>(); + private final HashMap<QName, List<ODEProcess>> _wsServiceMap = new HashMap<QName, List<ODEProcess>>(); + + private final HashMap<String, ODERESTProcess> _restServiceMap = new HashMap<String, ODERESTProcess>(); /** Weak-reference cache of all the my-role message exchange objects. */ - private final MyRoleMessageExchangeCache _myRoleMexCache = new MyRoleMessageExchangeCache(); + private final IncomingMessageExchangeCache _incomingMexCache = new IncomingMessageExchangeCache(); private State _state = State.SHUTDOWN; @@ -136,7 +123,7 @@ private final AtomicLong _lastTimeOfServerCallable = new AtomicLong(System.currentTimeMillis()); /** Mapping from a potentially shared endpoint to its EPR */ - private SharedEndpoints _sharedEps; + private SharedEndpoints _sharedEps; static { // TODO Clean this up and factorize engine configuration @@ -144,7 +131,7 @@ String processMaxAge = System.getProperty("ode.process.maxage"); if (processMaxAge != null && processMaxAge.length() > 0) { __processMaxAge = Long.valueOf(processMaxAge); - __log.info("Process definition max age adjusted. Max age = " + __processMaxAge + "ms."); + __log.debug("Process definition max age adjusted. Max age = " + __processMaxAge + "ms."); } } catch (Throwable t) { if (__log.isDebugEnabled()) { @@ -223,7 +210,7 @@ _contexts.scheduler.start(); _state = State.RUNNING; - __log.info(__msgs.msgServerStarted()); + __log.debug(__msgs.msgServerStarted()); if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start(); } finally { @@ -292,7 +279,7 @@ _contexts.scheduler.stop(); _state = State.INIT; - __log.info(__msgs.msgServerStopped()); + __log.debug(__msgs.msgServerStopped()); } finally { _mngmtLock.writeLock().unlock(); } @@ -337,8 +324,7 @@ __log.debug("register: " + conf.getProcessId()); - // Ok, IO out of the way, we will mod the server state, so need to get a - // lock. + // Ok, IO out of the way, we will mod the server state, so need to get a lock. try { _mngmtLock.writeLock().lockInterruptibly(); } catch (InterruptedException ie) { @@ -355,34 +341,41 @@ __log.debug("Registering process " + conf.getProcessId() + " with server."); - ODEProcess process = new ODEProcess(this, conf, null, _myRoleMexCache); - - for (Endpoint e : process.getServiceNames()) { - __log.debug("Register process: serviceId=" + e + ", process=" + process); - // Get the list of processes associated with the given service - List<ODEProcess> processes = _serviceMap.get(e.serviceName); - if (processes == null) { - // Create an empty list, if no processes were associated - _serviceMap.put(e.serviceName, processes = new ArrayList<ODEProcess>()); + ODEProcess process; + if (conf.isRestful()) { + ODERESTProcess restProcess = new ODERESTProcess(this, conf, null, _incomingMexCache); + for (String resUrl : restProcess.initResources()) { + _restServiceMap.put(resUrl, restProcess); } - // Remove any older version of the process from the list - for (int i = 0; i < processes.size(); i++) { - ODEProcess cachedVersion = processes.get(i); - __log.debug("cached version " + cachedVersion.getPID() + " vs registering version " + process.getPID()); - if (cachedVersion.getProcessType().equals(process.getProcessType())) { - processes.remove(cachedVersion); + process = restProcess; + } else { + ODEWSProcess wsProcess = new ODEWSProcess(this, conf, null, _incomingMexCache); + for (Endpoint e : wsProcess.getServiceNames()) { + __log.debug("Register process: serviceId=" + e + ", process=" + wsProcess); + // Get the list of processes associated with the given service + List<ODEProcess> processes = _wsServiceMap.get(e.serviceName); + // Create an empty list, if no processes were associated + if (processes == null) + _wsServiceMap.put(e.serviceName, processes = new ArrayList<ODEProcess>()); + + // Remove any older version of the process from the list + for (int i = 0; i < processes.size(); i++) { + ODEProcess cachedVersion = processes.get(i); + __log.debug("cached version " + cachedVersion.getPID() + " vs registering version " + wsProcess.getPID()); + if (cachedVersion.getProcessType().equals(wsProcess.getProcessType())) + processes.remove(cachedVersion); } + // Add the given process to the list associated with the given service + processes.add(wsProcess); } - // Add the given process to the list associated with the given service - processes.add(process); + process = wsProcess; } process.activate(_contexts); - _registeredProcesses.put(process.getPID(), process); if (_dehydrationPolicy == null) process.hydrate(); - __log.info(__msgs.msgProcessRegistered(conf.getProcessId())); + __log.debug(__msgs.msgProcessRegistered(conf.getProcessId())); } finally { _mngmtLock.writeLock().unlock(); } @@ -410,12 +403,12 @@ // Remove the process from any services that might reference it. // However, don't remove the service itself from the map. - for (List<ODEProcess> processes : _serviceMap.values()) { + for (List<ODEProcess> processes : _wsServiceMap.values()) { __log.debug("removing process " + pid + "; handle " + p + "; exists " + processes.contains(p)); processes.remove(p); } - __log.info(__msgs.msgProcessUnregistered(pid)); + __log.debug(__msgs.msgProcessUnregistered(pid)); } catch (Exception ex) { __log.error(__msgs.msgProcessUnregisterFailed(pid), ex); @@ -463,7 +456,7 @@ // one service is listening on the same endpoint. _mngmtLock.readLock().lock(); try { - return _serviceMap.get(service); + return _wsServiceMap.get(service); } finally { _mngmtLock.readLock().unlock(); } @@ -514,7 +507,7 @@ public Void call() throws Exception { _contexts.scheduler.jobCompleted(jobInfo.jobName); Date future = new Date(System.currentTimeMillis() + (60 * 1000)); - __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future)); + __log.debug(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future)); _contexts.scheduler.schedulePersistedJob(we.getDetail(), future); return null; } @@ -621,7 +614,38 @@ _mngmtLock.readLock().unlock(); } } - + + public RESTInMessageExchange createMessageExchange(final Resource resource, String foreignKey) throws BpelEngineException { + _mngmtLock.readLock().lock(); + try { + ODERESTProcess target = _restServiceMap.get(resource.getUrl()); + if (target == null) { + try { + QName processId = _contexts.execTransaction(new Callable<QName>() { + public QName call() { + ResourceRouteDAO rr = _contexts.dao.getConnection() + .getResourceRoute(resource.getUrl(), resource.getMethod()); + if (rr == null) return null; + ProcessDAO processDao = rr.getInstance().getProcess(); + return processDao.getProcessId(); + } + }); + for (ODEProcess odeRestProcess : _registeredProcesses.values()) { + if (odeRestProcess._pid.equals(processId)) target = (ODERESTProcess)odeRestProcess; + } + } catch (Exception e) { + throw new BpelEngineException(e); + } + } + + if (target == null) throw new BpelEngineException("No such resource: " + resource.getUrl()); + assertNoTransaction(); + return target.createRESTMessageExchange(resource, foreignKey); + } finally { + _mngmtLock.readLock().unlock(); + } + } + /** * Return a simple type of MEX for a given process target * @param process @@ -638,7 +662,7 @@ else assertNoTransaction(); - return process.createNewMyRoleMex(istyle, targetService, operation); + return ((ODEWSProcess)process).createNewMyRoleMex(istyle, targetService, operation); } /** @@ -698,9 +722,9 @@ switch (mexdao.getDirection()) { case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE: - return process.createPartnerRoleMex(mexdao); + return ((ODEWSProcess)process).createPartnerRoleMex(mexdao); case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE: - return process.lookupMyRoleMex(mexdao); + return ((ODEWSProcess)process).lookupMyRoleMex(mexdao); default: String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId; __log.fatal(errmsg); @@ -811,15 +835,12 @@ void scheduleRunnable(final Runnable runnable) { assertTransaction(); _contexts.registerCommitSynchronizer(new Runnable() { - public void run() { _exec.submit(new ServerRunnable(runnable)); } }); - } - protected void assertTransaction() { if (!_contexts.isTransacted()) @@ -893,7 +914,7 @@ } } } catch (InterruptedException e) { - __log.info(e); + __log.debug(e); } } } @@ -911,7 +932,6 @@ _lastTimeOfServerCallable.set(System.currentTimeMillis()); } - class ServerRunnable implements Runnable { final Runnable _work; Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java?rev=773939&r1=773938&r2=773939&view=diff ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue May 12 15:23:30 2009 @@ -81,12 +81,10 @@ public void execTransaction(final Runnable transaction) { try { execTransaction(new Callable<Void>() { - public Void call() throws Exception { transaction.run(); return null; } - }); } catch (Exception e) { throw new BpelEngineException(e); Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java?rev=773939&view=auto ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java (added) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/IncomingMessageExchangeCache.java Tue May 12 15:23:30 2009 @@ -0,0 +1,61 @@ +package org.apache.ode.bpel.engine; + +import java.lang.ref.WeakReference; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.ode.bpel.dao.MessageExchangeDAO; + +/** + * Manage {...@link IncomingMessageExchangeCache} object references. + */ +class IncomingMessageExchangeCache { + + private static final int CLEANUP_PERIOD = 20; + + private Map<String, WeakReference<MessageExchangeImpl>> _cache = new ConcurrentHashMap<String, WeakReference<MessageExchangeImpl>>(); + + private int _inserts = 0; + + void put(MessageExchangeImpl mex) { + ++_inserts; + if (_inserts > CLEANUP_PERIOD) cleanup(); + + WeakReference<MessageExchangeImpl> ref = _cache.get(mex.getMessageExchangeId()); + if (ref != null && ref.get() != null) + throw new IllegalStateException("InternalError: duplicate myrolemex registration!"); + + _cache.put(mex.getMessageExchangeId(), new WeakReference<MessageExchangeImpl>(mex)); + } + + /** + * Retrieve a {...@link MyRoleMessageExchangeImpl} from the cache, re-creating if necessary. + * + * @param mexdao + * @return + */ + MessageExchangeImpl get(MessageExchangeDAO mexdao, ODEProcess process) { + WeakReference<MessageExchangeImpl> ref = _cache.get(mexdao.getMessageExchangeId()); + MessageExchangeImpl mex = ref == null ? null : ref.get(); + + if (mex == null) { + mex = process.recreateIncomingMex(mexdao); + _cache.put(mexdao.getMessageExchangeId(), new WeakReference<MessageExchangeImpl>(mex)); + } + return mex; + } + + /** + * Remove stale references. + * + */ + private void cleanup() { + for (Iterator<WeakReference<MessageExchangeImpl>> i = _cache.values().iterator(); i.hasNext();) { + WeakReference<MessageExchangeImpl> ref = i.next(); + if (ref.get() == null) i.remove(); + } + _inserts = 0; + } +} Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?rev=773939&r1=773938&r2=773939&view=diff ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue May 12 15:23:30 2009 @@ -22,7 +22,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.Callable; +import java.util.concurrent.*; import javax.wsdl.Operation; import javax.wsdl.PortType; @@ -159,8 +159,8 @@ } void save(MessageExchangeDAO dao) { - dao.setPartnerLinkModelId(_oplink.getId()); - dao.setOperation(_operation.getName()); + if (_oplink != null) dao.setPartnerLinkModelId(_oplink.getId()); + if (_operation != null) dao.setOperation(_operation.getName()); dao.setStatus(_status); dao.setInvocationStyle(getInvocationStyle()); dao.setFault(_fault); @@ -413,5 +413,51 @@ void sync() { ++_syncdummy; } + + protected static class ResponseFuture implements Future<Status> { + private Status _status; + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + public Status get() throws InterruptedException, ExecutionException { + try { + return get(0, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // If it's thrown it's definitely a bug + throw new RuntimeException(e); + } + } + + public Status get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + synchronized (this) { + if (_status != null) + return _status; + + this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); + + if (_status == null) throw new TimeoutException(); + return _status; + } + } + + public boolean isCancelled() { + return false; + } + + public boolean isDone() { + return _status != null; + } + + void done(Status status) { + synchronized (this) { + _status = status; + this.notifyAll(); + } + } + } + + }
