chamikara 2005/01/10 23:22:29
Modified: sandesha/src/org/apache/sandesha/server/queue
ServerQueue.java
Log:
added method setAckReceived
Revision Changes Path
1.12 +57 -2
ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java
Index: ServerQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/queue/ServerQueue.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- ServerQueue.java 10 Jan 2005 23:03:14 -0000 1.11
+++ ServerQueue.java 11 Jan 2005 07:22:28 -0000 1.12
@@ -821,15 +821,70 @@
}
+ public void setAckReceived(RMMessageContext responseMsg)
+ {
+ String requestMsgID =
responseMsg.getAddressingHeaders().getRelatesTo().toString();
+
+ Iterator it = outgoingMap.keySet().iterator();
+
+ String key =null;
+ while(it.hasNext()){
+ key = (String) it.next();
+ Object obj=outgoingMap.get(key);
+ if(obj!=null){
+ ResponseSequenceHash hash = (ResponseSequenceHash)obj;
+ boolean hasMsg = hash.hasMessageWithId(requestMsgID);
+ if(!hasMsg)
+ //set the property response received
+ hash.setAckReceived(requestMsgID);
+ }
+ }
+
+ }
- /*public Vector getAllIncommingMsgNumbers(String seqId){
+
+ public Vector getAllIncommingMsgNumbers(String seqId){
Vector msgNumbers = new Vector();
incomingMap.get(seqId);
//Not implemented yet.
return msgNumbers;
- }*/
+ }
+
+ public RMMessageContext getLowPriorityMessageIfAcked(){
+ int size = lowPriorityQueue.size();
+
+
+ RMMessageContext terminateMsg = null;
+ for(int i=0;i<size;i++) {
+ RMMessageContext temp;
+ temp = (RMMessageContext) lowPriorityQueue.get(i);
+ String seqId = temp.getSequenceID();
+
+ boolean foundSeq = false;
+ ResponseSequenceHash hash = null;
+ Iterator it1 = outgoingMap.keySet().iterator();
+ while(it1.hasNext()){
+ hash = (ResponseSequenceHash) it1.next();
+ if(hash.getSequenceId()==seqId){
+ foundSeq = true;
+ break;
+ }
+ }
+
+ if(foundSeq && hash!=null){
+ boolean complete = hash.isAckComplete();
+ if(complete)
+ lowPriorityQueue.remove(i);
+ terminateMsg = temp;
+ break;
+ }
+
+ }
+
+ return terminateMsg;
+ }
}