Author: rr
Date: Wed Sep  9 19:16:51 2009
New Revision: 813084

URL: http://svn.apache.org/viewvc?rev=813084&view=rev
Log:
ODE-483: Instance replayer - added failures dispatching + bugfix for replaying 
multiple instances at once

Modified:
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
    
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
    ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/Replayer.java
 Wed Sep  9 19:16:51 2009
@@ -34,6 +34,7 @@
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.engine.BpelEngineImpl;
 import org.apache.ode.bpel.iapi.BpelEngine;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
 import org.apache.ode.bpel.pmapi.CommunicationType;
 import org.apache.ode.bpel.pmapi.ExchangeType;
@@ -125,6 +126,8 @@
         CommunicationType result = CommunicationType.Factory.newInstance();
         List<Exchange> list = new ArrayList<Exchange>();
         ProcessInstanceDAO instance = conn.getInstance(iid);
+        if (instance == null)
+            return result;
         result.setProcessType(instance.getProcess().getType());
 
         for (String mexId : instance.getMessageExchangeIds()) {
@@ -140,13 +143,18 @@
                 __log.error("", e1);
             }
             try {
-                if (mexDao.getResponse() != null) {
-                    if ("FAULT".equals(mexDao.getStatus())) {
-                        Fault f = e.addNewFault();
-                        f.setType(mexDao.getFault());
-                        f.setExplanation(mexDao.getFaultExplanation());
+                Status status = Status.valueOf(mexDao.getStatus());
+                if (status == Status.FAULT) {
+                    Fault f = e.addNewFault();
+                    f.setType(mexDao.getFault());
+                    f.setExplanation(mexDao.getFaultExplanation());
+                    if (mexDao.getResponse() != null) {
                         
f.setMessage(XmlObject.Factory.parse(mexDao.getResponse().getData()));
-                    } else {
+                    }
+                } else if (status == Status.FAILURE) {
+                    
e.addNewFailure().setExplanation(mexDao.getFaultExplanation());
+                } else {
+                    if (mexDao.getResponse() != null) {
                         
e.setOut(XmlObject.Factory.parse(mexDao.getResponse().getData()));
                     }
                 }

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerBpelRuntimeContextImpl.java
 Wed Sep  9 19:16:51 2009
@@ -85,7 +85,7 @@
     public String invoke(int aid, PartnerLinkInstance partnerLink, Operation 
operation, Element outgoingMessage, InvokeResponseChannel channel) throws 
FaultException {
         __log.debug("invoke");
 
-        Exchange answer = 
replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(),
 operation.getName());
+        Exchange answer = 
replayerContext.answers.fetchAnswer(partnerLink.partnerLink.partnerRolePortType.getQName(),
 operation.getName(), outgoingMessage, getCurrentEventDateTime());
 
         PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
 
@@ -127,6 +127,9 @@
                 }
                 mexDao.setResponse(response);
                 mexDao.setStatus(Status.RESPONSE.toString());
+            } else if (answer.isSetFailure()) {
+                
mexDao.setFaultExplanation(answer.getFailure().getExplanation());
+                mexDao.setStatus(Status.FAILURE.toString());
             } else {
                 // We don't have output for in-out operation - resulting with
                 // replayer error to the top
@@ -188,7 +191,7 @@
         MessageDAO message = 
mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput().getMessage().getQName());
         buildOutgoingMessage(message, msg);
 
-        __log.debug("reply mexRef:" + mexRef);
+        __log.debug("instance replied mexRef:" + mexRef + " " + 
DOMUtils.domToString(msg));
         mex.setResponse(message);
         mex.setStatus(Status.RESPONSE.toString());
     }
@@ -249,6 +252,7 @@
                     // Kill the route so some new message does not get routed 
to
                     // same process instance.
                     
routing.correlator.removeRoutes(routing.messageRoute.getGroupId(), _dao);
+
                     execute();
                     return true;
                 }

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerContext.java
 Wed Sep  9 19:16:51 2009
@@ -41,6 +41,8 @@
 import org.apache.ode.bpel.pmapi.ExchangeType;
 import org.apache.ode.bpel.pmapi.CommunicationType.Exchange;
 import org.apache.ode.bpel.runtime.PROCESS;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
 
 /**
  * Context holding replayer state (eg. invoke answers) for single instance 
during replaying.
@@ -77,14 +79,14 @@
             v.answers.add(e);
         }
 
-        public Exchange fetchAnswer(QName service, String operation) {
+        public Exchange fetchAnswer(QName service, String operation, Element 
outgointMessage, Date currentEventDateTime) {
             __log.debug("fetching answer for " + service + " " + operation);
             String key = getAnswersKey(service, operation);
             AnswersForKey v = answersMap.get(key);
-            if (v == null) {
-                throw new IllegalStateException("answer for " + service + " " 
+ operation + " not found");
+            Exchange e = v == null ? null : v.answerPos < v.answers.size() ? 
v.answers.get(v.answerPos) : null;
+            if (e == null) {
+                throw new IllegalStateException("answer for " + service + " " 
+ operation + " at time " + currentEventDateTime + " not found, outgoing 
message was " + DOMUtils.domToString(outgointMessage));
             }
-            Exchange e = v.answers.get(v.answerPos);
             v.answerPos++;
             __log.debug("fetched " + e);
             return e;
@@ -129,50 +131,67 @@
         }, time, runtimeContext);
     }
 
-    public void init(CommunicationType r, ReplayerScheduler scheduler) throws 
Exception {
+    public void init(final CommunicationType r, ReplayerScheduler scheduler) 
throws Exception {
         this.scheduler = scheduler;
-        List<Exchange> exchangeList = r.getExchangeList();
+        final List<Exchange> exchangeList = r.getExchangeList();
 
         for (int i = 1; i < exchangeList.size(); i++) {
             Exchange e = exchangeList.get(i);
-            if (e.getType() == ExchangeType.P) {
+            // We skip failures, because INVOKE_CHECK job is not handled by
+            // replayer
+            if (e.getType() == ExchangeType.P && !e.isSetFailure()) {
                 answers.add(e);
             }
         }
 
         {
             final Exchange e = exchangeList.get(0);
-            final BpelProcess p = 
bpelEngine.getNewestProcessByType(r.getProcessType());
-            final ProcessDAO processDAO = p.getProcessDAO();
-            final MyRoleMessageExchangeImpl mex = 
ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
-
-            p.invokeProcess(mex, new BpelProcess.InvokeHandler() {
-                public boolean invoke(PartnerLinkMyRoleImpl target, 
RoutingInfo routing, boolean createInstance) {
-                    if (routing.messageRoute == null && createInstance) {
-                        ProcessInstanceDAO newInstance = 
processDAO.createInstance(routing.correlator);
-
-                        runtimeContext = new ReplayerBpelRuntimeContextImpl(p, 
newInstance, new PROCESS(p.getOProcess()), mex, ReplayerContext.this);
-                        
runtimeContext.setCurrentEventDateTime(e.getCreateTime().getTime());
-                        runtimeContext.updateMyRoleMex(mex);
-                        // first receive is matched to provided mex
-                        runtimeContext.execute();
-                        return true;
-                    } else if (routing.messageRoute != null) {
-                        throw new IllegalStateException("Instantiating mex 
causes invocation of existing instance " + mex);
+
+            final Date time = e.getCreateTime().getTime();
+            scheduler.scheduleReplayerJob(new Callable<Void>() {
+                public Void call() throws Exception {
+                    __log.debug("initial call " + e);
+
+                    final BpelProcess p = 
bpelEngine.getNewestProcessByType(r.getProcessType());
+                    final ProcessDAO processDAO = p.getProcessDAO();
+                    final MyRoleMessageExchangeImpl mex = 
ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
+
+                    p.invokeProcess(mex,
+                    // time,
+                            new BpelProcess.InvokeHandler() {
+                                public boolean invoke(PartnerLinkMyRoleImpl 
target, RoutingInfo routing, boolean createInstance) {
+                                    if (routing.messageRoute == null && 
createInstance) {
+                                        ProcessInstanceDAO newInstance = 
processDAO.createInstance(routing.correlator);
+
+                                        runtimeContext = new 
ReplayerBpelRuntimeContextImpl(p, newInstance, new PROCESS(p.getOProcess()), 
mex,
+                                        // time,
+                                                ReplayerContext.this);
+                                        
runtimeContext.setCurrentEventDateTime(time);
+                                        runtimeContext.updateMyRoleMex(mex);
+                                        // first receive is matched to provided
+                                        // mex
+                                        runtimeContext.execute();
+                                        return true;
+                                    } else if (routing.messageRoute != null) {
+                                        throw new 
IllegalStateException("Instantiating mex causes invocation of existing instance 
" + mex);
+                                    }
+                                    return false;
+                                }
+                            });
+
+                    for (int i = 1; i < exchangeList.size(); i++) {
+                        Exchange e2 = exchangeList.get(i);
+                        if (e2.getType() == ExchangeType.M) {
+                            MyRoleMessageExchangeImpl mex2 = 
ReplayerBpelRuntimeContextImpl.createMyRoleMex(e2, bpelEngine);
+                            runtimeContext.updateMyRoleMex(mex2);
+                            scheduleInvoke(e2, mex2);
+                        }
                     }
-                    return false;
+                    return null;
                 }
-            });
+            }, time, null);
         }
 
-        for (int i = 1; i < exchangeList.size(); i++) {
-            Exchange e = exchangeList.get(i);
-            if (e.getType() == ExchangeType.M) {
-                MyRoleMessageExchangeImpl mex = 
ReplayerBpelRuntimeContextImpl.createMyRoleMex(e, bpelEngine);
-                runtimeContext.updateMyRoleMex(mex);
-                scheduleInvoke(e, mex);
-            }
-        }
     }
 
     public void run() throws Exception {

Modified: 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/replayer/ReplayerScheduler.java
 Wed Sep  9 19:16:51 2009
@@ -92,7 +92,9 @@
         while (!taskQueue.isEmpty()) {
             TaskElement taskElement = taskQueue.remove();
             __log.debug("executing action at time " + taskElement.when);
-            
taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+            if (taskElement.runtimeContext != null) {
+                
taskElement.runtimeContext.setCurrentEventDateTime(taskElement.when);
+            }
             taskElement.action.call();
         }
     }

Modified: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd?rev=813084&r1=813083&r2=813084&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/pmapi.xsd Wed Sep  9 
19:16:51 2009
@@ -48,65 +48,73 @@
     </simpleType>
     
     <complexType name="GetCommunication">
-       <xs:sequence>
-               <xs:element name="iid" type="long" 
maxOccurs="unbounded"></xs:element>
-       </xs:sequence>
+        <xs:sequence>
+            <xs:element name="iid" type="long" 
maxOccurs="unbounded"></xs:element>
+        </xs:sequence>
     </complexType>
     
-       <xs:complexType name="GetCommunicationResponse">
-               <xs:sequence>
-               <element name="restoreInstance" minOccurs="0" 
maxOccurs="unbounded" type="pmapi:CommunicationType"/>
-               </xs:sequence>
-       </xs:complexType>
+    <xs:complexType name="GetCommunicationResponse">
+        <xs:sequence>
+            <element name="restoreInstance" minOccurs="0" 
maxOccurs="unbounded" type="pmapi:CommunicationType"/>
+        </xs:sequence>
+    </xs:complexType>
 
     <element name="getCommunicationResponse" 
type="pmapi:GetCommunicationResponse"/>
 
     <complexType name="CommunicationType">
-       <sequence>
-               <element name="processType" type="QName" />
+        <sequence>
+            <element name="processType" type="QName" />
 
-               <element name="serviceConfig" maxOccurs="unbounded">
-                       <complexType>
-                               <sequence>
-                                       <element name="name" type="QName" />
-                                       <element name="replayingType"
-                                               type="pmapi:ReplayType" />
-                               </sequence>
-                       </complexType>
-               </element>
+            <element name="serviceConfig" maxOccurs="unbounded">
+                <complexType>
+                    <sequence>
+                        <element name="name" type="QName" />
+                        <element name="replayingType"
+                            type="pmapi:ReplayType" />
+                    </sequence>
+                </complexType>
+            </element>
 
-               <element name="exchange" maxOccurs="unbounded">
-                       <complexType>
-                               <sequence>
-                                       <element name="type" 
type="pmapi:ExchangeType" />
-                                       <element name="createTime" 
type="dateTime" />
-                                       <element name="service" type="QName" />
-                                       <element name="operation" type="string" 
/>
-                                       <element name="in" type="anyType" />
-                                       <element name="out" type="anyType"
-                                               minOccurs="0" />
-                                       <element name="fault" minOccurs="0">
-                                               <complexType>
-                                                       <sequence>
-                                                               <element 
name="type" type="QName" />
-                                                               <element 
name="explanation"
-                                                                       
type="string" />
-                                                               <element 
name="message"
-                                                                       
type="anyType" />
-                                                       </sequence>
-                                               </complexType>
-                                       </element>
-                               </sequence>
-                       </complexType>
-               </element>
-       </sequence>
+            <element name="exchange" maxOccurs="unbounded">
+                <complexType>
+                    <sequence>
+                        <element name="type" type="pmapi:ExchangeType" />
+                        <element name="createTime" type="dateTime" />
+                        <element name="service" type="QName" />
+                        <element name="operation" type="string" />
+                        <element name="in" type="anyType" />
+                        <choice>
+                            <element name="out" type="anyType"/>
+                            <element name="fault">
+                                <complexType>
+                                    <sequence>
+                                        <element name="type" type="QName" />
+                                        <element name="explanation"
+                                            type="string" />
+                                        <element name="message"
+                                            type="anyType" />
+                                    </sequence>
+                                </complexType>
+                            </element>
+                            <element name="failure">
+                                <complexType>
+                                    <sequence>
+                                        <element name="explanation" 
type="string" />
+                                    </sequence>
+                                </complexType>
+                           </element>
+                        </choice>
+                    </sequence>
+                </complexType>
+            </element>
+        </sequence>
     </complexType>
 
     <complexType name="Replay">
         <sequence>
-               <element name="upgradeInstance" minOccurs="0" 
maxOccurs="unbounded" type="long"/>
-               <element name="replaceInstance" minOccurs="0" 
maxOccurs="unbounded" type="long"/>
-               <element name="restoreInstance" minOccurs="0" 
maxOccurs="unbounded" type="pmapi:CommunicationType"/>
+            <element name="upgradeInstance" minOccurs="0" 
maxOccurs="unbounded" type="long"/>
+            <element name="replaceInstance" minOccurs="0" 
maxOccurs="unbounded" type="long"/>
+            <element name="restoreInstance" minOccurs="0" 
maxOccurs="unbounded" type="pmapi:CommunicationType"/>
         </sequence>
     </complexType>
 


Reply via email to