chamikara 2005/06/06 11:23:32
Modified: sandesha/src/org/apache/sandesha IStorageManager.java
sandesha/src/org/apache/sandesha/client
ClientStorageManager.java
sandesha/src/org/apache/sandesha/server
ServerStorageManager.java
sandesha/src/org/apache/sandesha/storage/dao
ISandeshaDAO.java SandeshaDatabaseDAO.java
SandeshaQueueDAO.java
sandesha/src/org/apache/sandesha/storage/queue
IncomingSequence.java SandeshaQueue.java
Log:
The queue classes were changed to give ack messages, not only after a
ackRequested or lastMessage, but also when a certain time period (ack interval)
has been passed since the last message arrived.
Revision Changes Path
1.26 +2 -0
ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java
Index: IStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -r1.25 -r1.26
--- IStorageManager.java 25 May 2005 10:08:59 -0000 1.25
+++ IStorageManager.java 6 Jun 2005 18:23:32 -0000 1.26
@@ -112,4 +112,6 @@
void clearStorage();
boolean isSequenceComplete(String seqId);
+
+ void sendAck(String sequenceId);
}
\ No newline at end of file
1.42 +10 -0
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
Index: ClientStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
retrieving revision 1.41
retrieving revision 1.42
diff -u -r1.41 -r1.42
--- ClientStorageManager.java 29 May 2005 11:22:04 -0000 1.41
+++ ClientStorageManager.java 6 Jun 2005 18:23:32 -0000 1.42
@@ -99,6 +99,10 @@
* responses received from the server side.
*/
public void addAcknowledgement(RMMessageContext rmMessageContext) {
+ String sequenceID = rmMessageContext.getSequenceID();
+ if(sequenceID!=null)
+ accessor.removeAllAcks(sequenceID);
+
addPriorityMessage(rmMessageContext);
}
@@ -254,6 +258,7 @@
return;
Long msgNo = new Long(messageNumber);
accessor.addMessageToIncomingSequence(sequenceId, msgNo,
rmMessageContext);
+ accessor.updateFinalMessageArrivedTime(sequenceId);
}
@@ -400,6 +405,11 @@
boolean incomingTerminateReceived =
accessor.isIncommingTerminateReceived(seqId);
return outTerminateSent && incomingTerminateReceived;
}
+
+ public void sendAck(String sequenceId) {
+ String keyId = accessor.getKeyFromIncomingSequenceId(sequenceId);
+ accessor.sendAck(keyId);
+ }
}
\ No newline at end of file
1.34 +11 -4
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
Index: ServerStorageManager.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
retrieving revision 1.33
retrieving revision 1.34
diff -u -r1.33 -r1.34
--- ServerStorageManager.java 29 May 2005 11:22:04 -0000 1.33
+++ ServerStorageManager.java 6 Jun 2005 18:23:32 -0000 1.34
@@ -191,6 +191,10 @@
* @see
org.apache.sandesha.IStorageManager#addAcknowledgement(org.apache.sandesha.RMMessageContext)
*/
public void addAcknowledgement(RMMessageContext rmMessageContext) {
+ String sequenceID = rmMessageContext.getSequenceID();
+ if(sequenceID!=null)
+ accessor.removeAllAcks(sequenceID);
+
addPriorityMessage(rmMessageContext);
}
@@ -259,6 +263,7 @@
Long msgNo = new Long(messageNumber);
accessor.addMessageToIncomingSequence(sequenceId, msgNo,
rmMessageContext);
+ accessor.updateFinalMessageArrivedTime(sequenceId);
}
@@ -367,11 +372,11 @@
}
public void addOffer(String msgID, String offerID) {
- //To change body of implemented methods use File | Settings | File
Templates.
+
}
public String getOffer(String msgID) {
- return null; //To change body of implemented methods use File |
Settings | File Templates.
+ return null;
}
public void clearStorage(){
@@ -379,8 +384,10 @@
}
public boolean isSequenceComplete(String seqId) {
- return false; //To change body of implemented methods use File |
Settings | File Templates.
+ return false;
}
-
+ public void sendAck(String sequenceId) {
+ accessor.sendAck(sequenceId);
+ }
}
\ No newline at end of file
1.15 +6 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java
Index: ISandeshaDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- ISandeshaDAO.java 25 May 2005 10:09:00 -0000 1.14
+++ ISandeshaDAO.java 6 Jun 2005 18:23:32 -0000 1.15
@@ -337,4 +337,10 @@
public boolean isOutgoingTerminateSent(String seqId);
public boolean isIncommingTerminateReceived(String seqId);
+
+ public void updateFinalMessageArrivedTime(String sequenceID);
+
+ public void sendAck(String sequenceId);
+
+ public void removeAllAcks(String sequenceID);
}
\ No newline at end of file
1.15 +9 -0
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java
Index: SandeshaDatabaseDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- SandeshaDatabaseDAO.java 24 May 2005 06:07:40 -0000 1.14
+++ SandeshaDatabaseDAO.java 6 Jun 2005 18:23:32 -0000 1.15
@@ -19,6 +19,7 @@
import org.apache.axis.components.logger.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.sandesha.RMMessageContext;
+import org.apache.sandesha.storage.queue.SandeshaQueue;
import java.util.Iterator;
import java.util.Set;
@@ -305,4 +306,12 @@
return false; //To change body of implemented methods use File |
Settings | File Templates.
}
+ public void updateFinalMessageArrivedTime(String sequenceID) {
+ }
+
+ public void sendAck(String sequenceId) {
+ }
+
+ public void removeAllAcks(String sequenceID){
+ }
}
\ No newline at end of file
1.16 +15 -1
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
Index: SandeshaQueueDAO.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- SandeshaQueueDAO.java 24 May 2005 06:07:40 -0000 1.15
+++ SandeshaQueueDAO.java 6 Jun 2005 18:23:32 -0000 1.16
@@ -421,5 +421,19 @@
SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
return sq.isIncommingTerminateReceived(seqId);
}
-
+
+ public void updateFinalMessageArrivedTime(String sequenceID) {
+ SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
+ sq.updateFinalMessageArrivedTime(sequenceID);
+ }
+
+ public void sendAck(String sequenceId) {
+ SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
+ sq.sendAck(sequenceId);
+ }
+
+ public void removeAllAcks(String sequenceID){
+ SandeshaQueue sq = SandeshaQueue.getInstance(this.endPoint);
+ sq.removeAllAcks(sequenceID);
+ }
}
1.9 +28 -1
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java
Index: IncomingSequence.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- IncomingSequence.java 16 May 2005 06:08:17 -0000 1.8
+++ IncomingSequence.java 6 Jun 2005 18:23:32 -0000 1.9
@@ -20,6 +20,7 @@
import org.apache.axis.components.logger.LogFactory;
import org.apache.axis.message.addressing.RelatesTo;
import org.apache.commons.logging.Log;
+import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
import java.util.*;
@@ -43,8 +44,34 @@
private long lastMsgNo = -1;
private String acksTo = null;
private String offer;
- //private Set msgNumbers;
+ private long finalMsgArrivedTime = 0; //this is the time the latest
application msg was arrived (
+ private long finalAckedTime = 0;
+ private boolean sendAck = false;
+ public long getFinalAckedTime() {
+ return finalAckedTime;
+ }
+
+ public void setFinalAckedTime(long finalAckedTime) {
+ this.finalAckedTime = finalAckedTime;
+ }
+
+ public long getFinalMsgArrivedTime() {
+ return finalMsgArrivedTime;
+ }
+
+ public void setFinalMsgArrivedTime(long finalMsgArrivedTime) {
+ this.finalMsgArrivedTime = finalMsgArrivedTime;
+ }
+
+ public boolean isSendAck() {
+ return sendAck;
+ }
+
+ public void setSendAck(boolean sendAck) {
+ this.sendAck = sendAck;
+ }
+
public String getOffer() {
return offer;
}
1.23 +83 -2
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
Index: SandeshaQueue.java
===================================================================
RCS file:
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -r1.22 -r1.23
--- SandeshaQueue.java 24 May 2005 06:07:40 -0000 1.22
+++ SandeshaQueue.java 6 Jun 2005 18:23:32 -0000 1.23
@@ -23,9 +23,12 @@
import org.apache.commons.logging.Log;
import org.apache.sandesha.Constants;
import org.apache.sandesha.RMMessageContext;
+import org.apache.sandesha.util.PolicyLoader;
import org.apache.sandesha.ws.rm.AcknowledgementRange;
import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
+import sun.nio.cs.HistoricallyNamedCharset;
+
import java.util.*;
@@ -321,7 +324,44 @@
}
break;
-
+ case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
+
+ //acks are send in the folowing manner.
+ //If a ack the system has asked to send a
ack (sequence.sendAck==true)
+ //then send it immediately.
+ //Also send a ack when a interval
(ACKNOWLEDGEMENT_INTERVAL) has passed
+ //since last message arrived.
+
+ String sequenceId = tempMsg.getSequenceID();
+ if(sequenceId==null)
+ continue;
+
+ String key =
getKeyFromIncomingSequenceId(sequenceId);
+ IncomingSequence sequence = (IncomingSequence)
incomingMap.get(key);
+ if(sequence==null)
+ continue;
+
+ d = new Date ();
+ currentTime = d.getTime();
+
+ if(sequence.isSendAck()){
+
+ tempMsg.setLastSentTime(currentTime);
+ msg = tempMsg;
+ sequence.setSendAck(false);
+ sequence.setFinalAckedTime(currentTime);
+ break forLoop;
+
+ }else{
+ long ackInterval =
PolicyLoader.getInstance().getAcknowledgementInterval();
+ long finalAckedTime =
sequence.getFinalAckedTime();
+ long finalMsgArrivedTime =
sequence.getFinalMsgArrivedTime();
+
+ if((finalMsgArrivedTime>finalAckedTime)
&& (currentTime>finalMsgArrivedTime+ackInterval))
+ sequence.setSendAck(true);
+ }
+
+ break;
default:
highPriorityQueue.remove(i);
queueBin.put(tempMsg.getMessageID(),
tempMsg);
@@ -576,11 +616,18 @@
}
public void movePriorityMsgToBin(String messageId) {
+
synchronized (highPriorityQueue) {
int size = highPriorityQueue.size();
for (int i = 0; i < size; i++) {
RMMessageContext msg = (RMMessageContext)
highPriorityQueue.get(i);
- String tempMsgId = (String) msg.getMessageIdList().get(0);
+
+ String tempMsgId;
+ try{
+ tempMsgId = (String) msg.getMessageIdList().get(0);
+ }catch(Exception ex){
+ tempMsgId = msg.getMessageID();
+ }
if (tempMsgId.equals(messageId)) {
highPriorityQueue.remove(i);
queueBin.put(messageId, msg);
@@ -1068,6 +1115,40 @@
}
}
+
+ public void updateFinalMessageArrivedTime(String sequenceId){
+ synchronized (incomingMap) {
+ IncomingSequence ics = (IncomingSequence)
incomingMap.get(sequenceId);
+ if(ics==null)
+ return;
+
+ Date d = new Date();
+ long time = d.getTime();
+ ics.setFinalMsgArrivedTime(time);
+ }
+ }
+
+ public void sendAck(String sequenceId){
+ synchronized (incomingMap) {
+ IncomingSequence ics = (IncomingSequence)
incomingMap.get(sequenceId);
+ if(ics==null)
+ return;
+
+ ics.setSendAck(true);
+ }
+ }
+
+ public void removeAllAcks (String sequenceID){
+ synchronized (highPriorityQueue){
+ int size = highPriorityQueue.size();
+
+ for(int i=0;i<size;i++){
+ RMMessageContext msg = (RMMessageContext)
highPriorityQueue.get(i);
+ if(msg.getSequenceID().equals(sequenceID) &&
msg.getMessageType()==Constants.MSG_TYPE_ACKNOWLEDGEMENT)
+ highPriorityQueue.remove(i);
+ }
+ }
+ }
}