jaliya      2005/02/25 05:41:48

  Modified:    sandesha/src/org/apache/sandesha IStorageManager.java
                        RMInitiator.java
               sandesha/src/org/apache/sandesha/client
                        ClientPropertyValidator.java
                        ClientStorageManager.java RMSender.java
               sandesha/src/org/apache/sandesha/server RMInvoker.java
                        Sender.java ServerStorageManager.java
               sandesha/src/org/apache/sandesha/server/msgprocessors
                        AcknowledgementProcessor.java
                        TerminateSequenceProcessor.java
               sandesha/src/org/apache/sandesha/storage/dao
                        ISandeshaDAO.java SandeshaDatabaseDAO.java
                        SandeshaQueueDAO.java
               sandesha/src/org/apache/sandesha/storage/queue
                        IncomingSequence.java OutgoingSequence.java
                        SandeshaQueue.java
               sandesha/src/org/apache/sandesha/util RMMessageCreator.java
               sandesha/src/org/apache/sandesha/ws/rm/providers
                        RMProvider.java
  Added:       sandesha/src/org/apache/sandesha RMReport.java RMStatus.java
  Log:
  Form the functionality wise the code is now complete. The only changes that 
needs to be done is to make the required minor changes to Sandesha according to 
the latest WS-RM spec released
  
  Revision  Changes    Path
  1.17      +6 -0      
ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java
  
  Index: IStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/IStorageManager.java,v
  retrieving revision 1.16
  retrieving revision 1.17
  diff -u -r1.16 -r1.17
  --- IStorageManager.java      20 Feb 2005 17:42:38 -0000      1.16
  +++ IStorageManager.java      25 Feb 2005 13:41:45 -0000      1.17
  @@ -151,4 +151,10 @@
   
       public String getOutgoingSeqenceIdOfIncomingMsg(RMMessageContext msg);
   
  +    public void setTerminateSend(String seqId);
  +
  +    public void setTerminateReceived(String seqId);
  +
  +    public String getKeyFromOutgoingSeqId(String seqId);
  +
   }
  \ No newline at end of file
  
  
  
  1.5       +15 -14    ws-fx/sandesha/src/org/apache/sandesha/RMInitiator.java
  
  Index: RMInitiator.java
  ===================================================================
  RCS file: /home/cvs/ws-fx/sandesha/src/org/apache/sandesha/RMInitiator.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- RMInitiator.java  24 Feb 2005 13:51:13 -0000      1.4
  +++ RMInitiator.java  25 Feb 2005 13:41:45 -0000      1.5
  @@ -26,7 +26,7 @@
   import org.apache.sandesha.server.Sender;
   import org.apache.sandesha.server.ServerStorageManager;
   import org.apache.sandesha.ws.rm.providers.RMProvider;
  -import org.apache.util.PropertyLoader;
  +import org.apache.sandesha.util.PropertyLoader;
   import org.w3c.dom.Document;
   
   import javax.xml.namespace.QName;
  @@ -97,23 +97,24 @@
   
           //This should check whether we have received all the acks or 
reponses if any
           IStorageManager storageManager = new ClientStorageManager();
  +        storageManager.isAllSequenceComplete();
   
  -//        while(!storageManager.isAllSequenceComplete()){
  -//            try {
  -//                System.out.println("Checking to stop the 
client......................");
  -//                Thread.sleep(1000);
  -//            } catch (InterruptedException e) {
  -//                e.printStackTrace();  //To change body of catch statement 
use File | Settings | File Templates.
  -//            }
  -//        }
  +        while(!storageManager.isAllSequenceComplete()){
  +            try {
  +                System.out.println("Checking to stop the 
client......................");
  +                Thread.sleep(1000);
  +            } catch (InterruptedException e) {
  +                e.printStackTrace();  //To change body of catch statement 
use File | Settings | File Templates.
  +            }
  +        }
   
           if (listenerStarted)
               sas.stop();
  -        try {
  -            Thread.sleep(Constants.CLIENT_WAIT_PERIOD_FOR_COMPLETE);
  -        } catch (InterruptedException e) {
  -            e.printStackTrace();  //To change body of catch statement use 
File | Settings | File Templates.
  -        }
  +//        try {
  +//            Thread.sleep(Constants.CLIENT_WAIT_PERIOD_FOR_COMPLETE);
  +//        } catch (InterruptedException e) {
  +//            e.printStackTrace();  //To change body of catch statement use 
File | Settings | File Templates.
  +//        }
           sender.setRunning(false);
           return new RMStatus();
   
  
  
  
  1.1                  ws-fx/sandesha/src/org/apache/sandesha/RMReport.java
  
  Index: RMReport.java
  ===================================================================
  package org.apache.sandesha;
  
  public class RMReport {
      public boolean isAllAcked(){
          return false;
      }
  
      public int getNumberOfReturnMessages(){
          return 0;
      }
  
  }
  
  
  
  1.1                  ws-fx/sandesha/src/org/apache/sandesha/RMStatus.java
  
  Index: RMStatus.java
  ===================================================================
  package org.apache.sandesha;
  
  public class RMStatus {
      public boolean isComplete(){
          return false;
      }
      public RMReport getReport(){
          return new RMReport();
      }
  
  }
  
  
  
  1.11      +1 -1      
ws-fx/sandesha/src/org/apache/sandesha/client/ClientPropertyValidator.java
  
  Index: ClientPropertyValidator.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientPropertyValidator.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- ClientPropertyValidator.java      24 Feb 2005 13:51:13 -0000      1.10
  +++ ClientPropertyValidator.java      25 Feb 2005 13:41:46 -0000      1.11
  @@ -20,7 +20,7 @@
   import org.apache.axis.client.Call;
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.RMMessageContext;
  -import org.apache.util.PropertyLoader;
  +import org.apache.sandesha.util.PropertyLoader;
   
   import javax.xml.namespace.QName;
   import java.net.InetAddress;
  
  
  
  1.23      +20 -7     
ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java
  
  Index: ClientStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/ClientStorageManager.java,v
  retrieving revision 1.22
  retrieving revision 1.23
  diff -u -r1.22 -r1.23
  --- ClientStorageManager.java 24 Feb 2005 13:51:13 -0000      1.22
  +++ ClientStorageManager.java 25 Feb 2005 13:41:46 -0000      1.23
  @@ -274,13 +274,15 @@
        * @see org.apache.sandesha.IStorageManager#isAllSequenceComplete()
        */
       public boolean isAllSequenceComplete() {
  -        boolean result=false;
  -        Iterator ite=accessor.getAllOutgoingSequences();
  -       while(ite.hasNext()){
  -            result=isAckComplete((String)ite.next());
  -       }
   
  -        return result;
  +        boolean outTerminateSent = accessor.isAllOutgoingTerminateSent();
  +
  +        boolean incomingTerminateReceived = 
accessor.isAllIncommingTerminateReceived();
  +
  +        if(outTerminateSent && incomingTerminateReceived)
  +            return true;
  +        else
  +            return false;
       }
   
       /* (non-Javadoc)
  @@ -373,5 +375,16 @@
           return accessor.searchForSequenceId(msgId);
       }
   
  -    
  +    public void setTerminateSend(String seqId) {
  +        accessor.setTerminateSend(seqId);
  +    }
  +
  +    public void setTerminateReceived(String seqId) {
  +        accessor.setTerminateReceived(seqId);
  +    }
  +
  +    public String getKeyFromOutgoingSeqId(String seqId){
  +        return accessor.getKeyFromOutgoingSequenceId(seqId);
  +    }
  +
   }
  \ No newline at end of file
  
  
  
  1.27      +1 -1      
ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java
  
  Index: RMSender.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/client/RMSender.java,v
  retrieving revision 1.26
  retrieving revision 1.27
  diff -u -r1.26 -r1.27
  --- RMSender.java     24 Feb 2005 13:51:13 -0000      1.26
  +++ RMSender.java     25 Feb 2005 13:41:46 -0000      1.27
  @@ -22,7 +22,7 @@
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.IStorageManager;
   import org.apache.sandesha.RMMessageContext;
  -import org.apache.util.RMMessageCreator;
  +import org.apache.sandesha.util.RMMessageCreator;
   
   public class RMSender extends BasicHandler {
   
  
  
  
  1.13      +23 -17    
ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java
  
  Index: RMInvoker.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/RMInvoker.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- RMInvoker.java    21 Feb 2005 12:08:21 -0000      1.12
  +++ RMInvoker.java    25 Feb 2005 13:41:46 -0000      1.13
  @@ -20,11 +20,13 @@
   import org.apache.axis.MessageContext;
   import org.apache.axis.components.uuid.UUIDGen;
   import org.apache.axis.components.uuid.UUIDGenFactory;
  +import org.apache.axis.message.addressing.AddressingHeaders;
   import org.apache.axis.providers.java.RPCProvider;
   import org.apache.sandesha.Constants;
   import org.apache.sandesha.EnvelopeCreator;
   import org.apache.sandesha.IStorageManager;
   import org.apache.sandesha.RMMessageContext;
  +import org.apache.sandesha.util.RMMessageCreator;
   
   import javax.xml.soap.SOAPEnvelope;
   
  @@ -71,15 +73,28 @@
                       // runtime.
                       RPCProvider rpcProvider = new RPCProvider();
   
  -                    if (rmMessageContext.isLastMessage()) {
  -                        //Insert Terminate Sequnce.
  -                                               
storageManager.insertTerminateSeqMessage(getTerminateSeqMessage(rmMessageContext));
  -                    }
  +
                       rpcProvider.invoke(rmMessageContext.getMsgContext());
                   
                       //Check whether we have an output (response) or not.
   
                       if 
(rmMessageContext.getMsgContext().getOperation().getMethod().getReturnType() != 
Void.TYPE) {
  +                        if (rmMessageContext.isLastMessage()) {
  +                            //Insert Terminate Sequnce.
  +                            AddressingHeaders addrHeaders = 
rmMessageContext.getAddressingHeaders();
  +
  +                            if (addrHeaders.getReplyTo() != null) {
  +                                String replyTo = 
addrHeaders.getReplyTo().getAddress().toString();
  +
  +                                 RMMessageContext terminateMsg= 
RMMessageCreator.createTerminateSeqMsg(rmMessageContext);
  +                            terminateMsg.setOutGoingAddress(replyTo);
  +                            
storageManager.insertTerminateSeqMessage(terminateMsg);
  +                            }else{
  +                                System.out.println("SERVER ERROR , CANNOT 
SEND THE TERMINTATION");
  +                                //TODO LOG THE ERROR
  +                            }
  +
  +                        }
                           //System.out
                           //        .println("STORING THE RESPONSE 
MESSAGE.....\n");
                           //Store the message in the response queue.
  @@ -114,6 +129,8 @@
   
                           if (firstMsgOfResponseSeq) {
                               // System.out.println("NO RESPONSE SEQUENCE");
  +
  +
                               RMMessageContext rmMsgContext = new 
RMMessageContext();
                               rmMessageContext.copyContents(rmMsgContext);
   
  @@ -137,7 +154,7 @@
                               
storageManager.setTemporaryOutSequence(rmMsgContext
                                       .getSequenceID(), Constants.UUID + id);
                               SOAPEnvelope createSequenceEnvelope = 
EnvelopeCreator.createCreateSequenceEnvelope(id,
  -                                            rmMsgContext, Constants.SERVER);
  +                                    rmMsgContext, Constants.SERVER);
   
                               
rmMsgContext.getMsgContext().setRequestMessage(new 
Message(createSequenceEnvelope));
   
  @@ -172,16 +189,5 @@
   
       }
   
  -    private RMMessageContext getTerminateSeqMessage(RMMessageContext 
rmMessageContext) {
  -        RMMessageContext terSeqRMMsgContext = new RMMessageContext();
  -        MessageContext terSeqMsgContext = new 
MessageContext(rmMessageContext.getMsgContext().getAxisEngine());
  -        terSeqRMMsgContext.setSequenceID(rmMessageContext.getSequenceID());
  -        
terSeqRMMsgContext.setAddressingHeaders(rmMessageContext.getAddressingHeaders());
  -        //RMMessageContext.copyMessageContext(msgContext, messageContext);
  -        
terSeqRMMsgContext.setOutGoingAddress(rmMessageContext.getOutGoingAddress());
  -        terSeqRMMsgContext.setMsgContext(terSeqMsgContext);
  -        
terSeqRMMsgContext.setMessageType(Constants.MSG_TYPE_TERMINATE_SEQUENCE);
  -        // TODO Auto-generated method stub
  -        return terSeqRMMsgContext;
  -    }
  +
   }
  \ No newline at end of file
  
  
  
  1.24      +1 -0      ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java
  
  Index: Sender.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/Sender.java,v
  retrieving revision 1.23
  retrieving revision 1.24
  diff -u -r1.23 -r1.24
  --- Sender.java       24 Feb 2005 13:51:13 -0000      1.23
  +++ Sender.java       25 Feb 2005 13:41:46 -0000      1.24
  @@ -265,6 +265,7 @@
                   {
                       System.out.println("INFO: SENDING TERMINATE SEQUENCE 
REQUEST ....");
                       sendTerminateSequenceRequest(rmMessageContext);
  +                    
storageManager.setTerminateSend(storageManager.getKeyFromOutgoingSeqId(rmMessageContext.getSequenceID()));
                       break;
                   }
               case Constants.MSG_TYPE_ACKNOWLEDGEMENT:
  
  
  
  1.21      +15 -1     
ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java
  
  Index: ServerStorageManager.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/ServerStorageManager.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- ServerStorageManager.java 20 Feb 2005 17:42:38 -0000      1.20
  +++ ServerStorageManager.java 25 Feb 2005 13:41:46 -0000      1.21
  @@ -37,6 +37,14 @@
   
   public class ServerStorageManager implements IStorageManager {
   
  +    public void setTerminateSend(String seqId) {
  +        //To change body of implemented methods use File | Settings | File 
Templates.
  +    }
  +
  +    public void setTerminateReceived(String seqId) {
  +        //To change body of implemented methods use File | Settings | File 
Templates.
  +    }
  +
       protected static Log log = LogFactory.getLog(ServerStorageManager.class
               .getName());
       private String tempSeqId = null; // used by getNextMessageToProcess();
  @@ -74,7 +82,8 @@
   
       public void setAcknowledged(String seqID, long msgNumber) {
           //TODO decide this in implementing the ServerSender.
  -        accessor.moveOutgoingMessageToBin(seqID, new Long(msgNumber));
  +        //accessor.moveOutgoingMessageToBin(seqID, new Long(msgNumber));
  +         accessor.markOutgoingMessageToDelete(seqID, new Long(msgNumber));
       }
   
       public void init() {
  @@ -103,6 +112,8 @@
           msg = accessor.getNextPriorityMessageContextToSend();
           if (msg == null)
               msg = accessor.getNextOutgoingMsgContextToSend();
  +         if (msg == null)
  +            msg = accessor.getNextLowPriorityMessageContextToSend();
           return msg;
   
       }
  @@ -383,4 +394,7 @@
           return accessor.hasLastIncomingMsgReceived(seqId);
       }
   
  +    public String getKeyFromOutgoingSeqId(String seqId) {
  +        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
  +    }
   }
  \ No newline at end of file
  
  
  
  1.4       +5 -7      
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/AcknowledgementProcessor.java
  
  Index: AcknowledgementProcessor.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/AcknowledgementProcessor.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- AcknowledgementProcessor.java     20 Feb 2005 08:49:45 -0000      1.3
  +++ AcknowledgementProcessor.java     25 Feb 2005 13:41:46 -0000      1.4
  @@ -69,16 +69,15 @@
           //else set the response env of the messageContext.
           String seqID = rmMessageContext.getSequenceID();
   
  -        long messageNumber = rmMessageContext.getRMHeaders().getSequence()
  -                .getMessageNumber().getMessageNumber();
  +        long messageNumber = rmMessageContext.getRMHeaders().getSequence() 
.getMessageNumber().getMessageNumber();
           //Assume that the list is sorted and in the ascending order.
           Map listOfMsgNumbers = storageManager.getListOfMessageNumbers(seqID);
   
           if (null == listOfMsgNumbers)
               System.out.println("MSG Number list is NULL");
  -        else {
  -            Iterator ite = listOfMsgNumbers.keySet().iterator();
  -        }
  +//        else {
  +//            Iterator ite = listOfMsgNumbers.keySet().iterator();
  +//        }
   
           Vector ackRangeVector = null;
           if (listOfMsgNumbers != null) {
  @@ -124,8 +123,7 @@
   
       private static RMMessageContext getAckRMMsgCtx(RMMessageContext 
rmMessageContext, Vector ackRangeVector) {
   
  -        SOAPEnvelope ackEnvelope = EnvelopeCreator
  -                .createAcknowledgementEnvelope(rmMessageContext, 
ackRangeVector);
  +        SOAPEnvelope ackEnvelope = 
EnvelopeCreator.createAcknowledgementEnvelope(rmMessageContext, ackRangeVector);
           //Add the envelope to the response message of the messageContext.
           //rmMessageContext.getMsgContext().setResponseMessage(new
           // Message(ackEnvelope));
  
  
  
  1.3       +3 -0      
ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/TerminateSequenceProcessor.java
  
  Index: TerminateSequenceProcessor.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/server/msgprocessors/TerminateSequenceProcessor.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- TerminateSequenceProcessor.java   18 Feb 2005 06:57:17 -0000      1.2
  +++ TerminateSequenceProcessor.java   25 Feb 2005 13:41:46 -0000      1.3
  @@ -39,6 +39,7 @@
   
           if (terminateSeq != null && terminateSeq.getIdentifier() != null) {
               String seqID = terminateSeq.getIdentifier().getIdentifier();
  +            storageManger.setTerminateReceived(seqID);
           }
   
   
  @@ -50,4 +51,6 @@
           return false;
       }
   
  +
  +
   }
  \ No newline at end of file
  
  
  
  1.6       +7 -0      
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java
  
  Index: ISandeshaDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/ISandeshaDAO.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- ISandeshaDAO.java 24 Feb 2005 13:51:14 -0000      1.5
  +++ ISandeshaDAO.java 25 Feb 2005 13:41:46 -0000      1.6
  @@ -113,5 +113,12 @@
   
       public Iterator getAllOutgoingSequences();
   
  +    public void setTerminateSend (String seqId);
  +
  +    public void setTerminateReceived (String seqId);
  +
  +    public boolean isAllOutgoingTerminateSent();
  +
  +    public boolean isAllIncommingTerminateReceived();
   
   }
  \ No newline at end of file
  
  
  
  1.6       +25 -0     
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java
  
  Index: SandeshaDatabaseDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaDatabaseDAO.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SandeshaDatabaseDAO.java  24 Feb 2005 13:51:14 -0000      1.5
  +++ SandeshaDatabaseDAO.java  25 Feb 2005 13:41:46 -0000      1.6
  @@ -44,6 +44,14 @@
           // TODO Auto-generated method stub
       }
   
  +    public boolean isOutgoingTerminateSent() {
  +        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
  +    }
  +
  +    public boolean isIncommingTerminateReceived() {
  +        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
  +    }
  +
       public void addRequestedSequence(String seqId) {
           //To change body of implemented methods use File | Settings | File 
Templates.
       }
  @@ -391,4 +399,21 @@
       public Iterator getAllOutgoingSequences() {
           return null;  //To change body of implemented methods use File | 
Settings | File Templates.
       }
  +
  +    public void setTerminateSend(String seqId) {
  +        //To change body of implemented methods use File | Settings | File 
Templates.
  +    }
  +
  +    public void setTerminateReceived(String seqId) {
  +        //To change body of implemented methods use File | Settings | File 
Templates.
  +    }
  +
  +    public boolean isAllOutgoingTerminateSent() {
  +        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
  +    }
  +
  +    public boolean isAllIncommingTerminateReceived() {
  +        return false;  //To change body of implemented methods use File | 
Settings | File Templates.
  +    }
  +
   }
  \ No newline at end of file
  
  
  
  1.7       +21 -0     
ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java
  
  Index: SandeshaQueueDAO.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/dao/SandeshaQueueDAO.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SandeshaQueueDAO.java     24 Feb 2005 13:51:14 -0000      1.6
  +++ SandeshaQueueDAO.java     25 Feb 2005 13:41:46 -0000      1.7
  @@ -449,4 +449,25 @@
          SandeshaQueue sq=SandeshaQueue.getInstance();
          return sq.getAllOutgoingSequences();
       }
  +
  +    public boolean isAllOutgoingTerminateSent() {
  +       SandeshaQueue sq=SandeshaQueue.getInstance();
  +       return sq.isAllOutgoingTerminateSent();
  +    }
  +
  +    public boolean isAllIncommingTerminateReceived() {
  +        SandeshaQueue sq=SandeshaQueue.getInstance();
  +        return sq.isAllIncommingTerminateReceived();
  +    }
  +
  +    public void setTerminateSend(String seqId) {
  +        SandeshaQueue sq=SandeshaQueue.getInstance();
  +        sq.setTerminateSend(seqId);
  +    }
  +
  +    public void setTerminateReceived(String seqId) {
  +        SandeshaQueue sq=SandeshaQueue.getInstance();
  +        sq.setTerminateReceived(seqId);
  +    }
  +
   }
  
  
  
  1.3       +12 -1     
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java
  
  Index: IncomingSequence.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/IncomingSequence.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- IncomingSequence.java     20 Feb 2005 17:42:38 -0000      1.2
  +++ IncomingSequence.java     25 Feb 2005 13:41:47 -0000      1.3
  @@ -42,7 +42,18 @@
       private HashMap hash;
       private boolean beingProcessedLock = false; //When true messages are
       private long lastMsgNo = -1;
  -    
  +
  +
  +    private boolean terminateReceived = false;
  +
  +    public boolean isTerminateReceived() {
  +        return terminateReceived;
  +    }
  +
  +    public void setTerminateReceived(boolean terminateReceived) {
  +        this.terminateReceived = terminateReceived;
  +    }
  +
       private static final Log log = 
LogFactory.getLog(IncomingSequence.class.getName());
   
       public IncomingSequence(String sequenceId) {
  
  
  
  1.3       +19 -1     
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java
  
  Index: OutgoingSequence.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/OutgoingSequence.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- OutgoingSequence.java     20 Feb 2005 17:42:38 -0000      1.2
  +++ OutgoingSequence.java     25 Feb 2005 13:41:47 -0000      1.3
  @@ -47,6 +47,24 @@
       private long lastMsgNo = -1;
       private long nextAutoNumber; // key for storing messages.
       private static final Log log = 
LogFactory.getLog(OutgoingSequence.class.getName());
  +    public boolean terminateSent = false;
  +    private boolean hasResponse = false;
  +
  +    public boolean hasResponse() {
  +        return hasResponse;
  +    }
  +
  +    public void setHasResponse(boolean hasResponse) {
  +        this.hasResponse = hasResponse;
  +    }
  +
  +    public boolean isTerminateSent() {
  +        return terminateSent;
  +    }
  +
  +    public void setTerminateSent(boolean terminateSent) {
  +        this.terminateSent = terminateSent;
  +    }
   
       public OutgoingSequence(String sequenceId) {
           this.sequenceId = sequenceId;
  @@ -268,7 +286,7 @@
   
       public boolean isAckComplete() {
           try {
  -            long lastMsgNo = getLastMessage();
  +            long lastMsgNo = getLastMsgNumber();
               if (lastMsgNo <= 0) {
                   return false;
               }
  
  
  
  1.7       +73 -21    
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
  
  Index: SandeshaQueue.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- SandeshaQueue.java        24 Feb 2005 13:51:14 -0000      1.6
  +++ SandeshaQueue.java        25 Feb 2005 13:41:47 -0000      1.7
  @@ -66,12 +66,12 @@
        * This will not replace messages automatically.
        */
   
  -    public Iterator getAllOutgoingSequences(){
  +    public Iterator getAllOutgoingSequences() {
           return outgoingMap.keySet().iterator();
       }
   
       public boolean addMessageToIncomingSequence(String seqId, Long messageNo,
  -            RMMessageContext msgCon) throws QueueException {
  +                                                RMMessageContext msgCon) 
throws QueueException {
           boolean successful = false;
   
           if (seqId == null || msgCon == null)
  @@ -124,6 +124,9 @@
                   if (msgCon.isLastMessage())
                       resSeqHash.setLastMsg(msgCon.getMsgNumber());
   
  +                if (msgCon.isHasResponse())
  +                    resSeqHash.setHasResponse(true);
  +
               }
           }
           return successful;
  @@ -241,6 +244,7 @@
           synchronized (highPriorityQueue) {
               if (msg == null)
                   throw new QueueException(Constants.Queue.MESSAGE_ID_NULL);
  +
               highPriorityQueue.add(msg);
           }
       }
  @@ -253,6 +257,7 @@
           }
       }
   
  +
       public RMMessageContext nextPriorityMessageToSend() throws 
QueueException {
   
           synchronized (highPriorityQueue) {
  @@ -268,22 +273,22 @@
                       RMMessageContext tempMsg = (RMMessageContext) 
highPriorityQueue.get(i);
                       if (tempMsg != null) {
                           switch (tempMsg.getMessageType()) {
  -                        //Create seq messages will not be removed.
  -                        case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
  -                            long lastSentTime = tempMsg.getLastSentTime();
  -                            Date d = new Date();
  -                            long currentTime = d.getTime();
  -                            if (currentTime >= lastSentTime + 
Constants.RETRANSMISSION_INTERVAL) {
  -                                tempMsg.setLastSentTime(currentTime);
  +                            //Create seq messages will not be removed.
  +                            case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
  +                                long lastSentTime = 
tempMsg.getLastSentTime();
  +                                Date d = new Date();
  +                                long currentTime = d.getTime();
  +                                if (currentTime >= lastSentTime + 
Constants.RETRANSMISSION_INTERVAL) {
  +                                    tempMsg.setLastSentTime(currentTime);
  +                                    msg = tempMsg;
  +                                    break forLoop;
  +                                }
  +                                break;
  +                            default:
  +                                highPriorityQueue.remove(i);
  +                                queueBin.put(tempMsg.getMessageID(), 
tempMsg);
                                   msg = tempMsg;
                                   break forLoop;
  -                            }
  -                            break;
  -                        default:
  -                            highPriorityQueue.remove(i);
  -                            queueBin.put(tempMsg.getMessageID(), tempMsg);
  -                            msg = tempMsg;
  -                            break forLoop;
                           }
                       }
                   }
  @@ -630,7 +635,7 @@
                   while (temp <= ackRng.getMaxValue()) {
                       Long lng = new Long(temp);
                       if (!msgNumbers.contains(lng)) //vector cant hv duplicate
  -                        // entries.
  +                    // entries.
                           msgNumbers.add(new Long(temp));
                       temp++;
                   }
  @@ -664,7 +669,7 @@
                   OutgoingSequence hash = (OutgoingSequence) obj;
                   boolean hasMsg = hash.hasMessageWithId(requestMsgID);
                   if (!hasMsg)
  -                    //set the property response received
  +                //set the property response received
                       hash.setResponseReceived(requestMsgID);
               }
           }
  @@ -816,8 +821,9 @@
   
       public String getKeyFromOutgoingSequenceId(String seqId) {
           Iterator it = outgoingMap.keySet().iterator();
  +        String key = null;
           while (it.hasNext()) {
  -            String key = (String) it.next();
  +            key = (String) it.next();
               OutgoingSequence os = (OutgoingSequence) outgoingMap.get(key);
   
               String seq = os.getSequenceId();
  @@ -825,9 +831,55 @@
                   continue;
   
               if (seq.equals(seqId))
  -                return key;
  +                break;
  +
           }
  -        return null;
  +        return key;
  +    }
  +
  +    public boolean isAllOutgoingTerminateSent() {
  +        synchronized (outgoingMap) {
  +            Iterator keys = outgoingMap.keySet().iterator();
  +
  +            while (keys.hasNext()) {
  +                OutgoingSequence ogs = (OutgoingSequence) 
outgoingMap.get(keys.next());
  +                if (!ogs.isTerminateSent())
  +                    return false;
  +            }
  +
  +            return true;
  +        }
  +    }
  +
  +    public boolean isAllIncommingTerminateReceived() {
  +        synchronized (incomingMap) {
  +            Iterator keys = incomingMap.keySet().iterator();
  +
  +            while (keys.hasNext()) {
  +                Object key = keys.next();
  +                IncomingSequence ics = (IncomingSequence) 
incomingMap.get(key);
  +                OutgoingSequence ogs = (OutgoingSequence) 
outgoingMap.get(key);
  +
  +                boolean hasResponse = ogs.hasResponse();
  +
  +                if (hasResponse && !ics.isTerminateReceived())
  +                    return false;
  +            }
  +
  +            return true;
  +        }
  +    }
  +
  +    public void setTerminateSend(String seqId) {
  +        OutgoingSequence ogs = (OutgoingSequence) outgoingMap.get(seqId);
  +        ogs.setTerminateSent(true);
       }
  +
  +    public void setTerminateReceived(String seqId) {
  +        IncomingSequence ics = (IncomingSequence) 
incomingMap.get(getKeyFromIncomingSequenceId(seqId));
  +        ics.setTerminateReceived(true);
  +    }
  +
  +
   }
   
  
  
  
  1.2       +7 -2      
ws-fx/sandesha/src/org/apache/sandesha/util/RMMessageCreator.java
  
  Index: RMMessageCreator.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/util/RMMessageCreator.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- RMMessageCreator.java     25 Feb 2005 02:53:07 -0000      1.1
  +++ RMMessageCreator.java     25 Feb 2005 13:41:47 -0000      1.2
  @@ -14,6 +14,8 @@
   import org.apache.sandesha.EnvelopeCreator;
   import org.apache.sandesha.client.ClientPropertyValidator;
   
  +import java.util.Vector;
  +
   /**
    * Created by IntelliJ IDEA.
    * User: Jaliya
  @@ -47,8 +49,7 @@
   
           //Set the outgoing address these need to be corrected.
           createSeqRMMsgContext.setOutGoingAddress(toAddress);
  -        SOAPEnvelope resEnvelope = 
EnvelopeCreator.createCreateSequenceEnvelope(uuid,
  -                createSeqRMMsgContext, Constants.CLIENT);
  +        SOAPEnvelope resEnvelope = 
EnvelopeCreator.createCreateSequenceEnvelope(uuid,createSeqRMMsgContext, 
Constants.CLIENT);
           MessageContext createSeqMsgContext = new 
MessageContext(msgContext.getAxisEngine());
   
           //This should be a clone operation.
  @@ -77,6 +78,10 @@
       }
   
       public static RMMessageContext createAcknowledgementMsg(RMMessageContext 
rmMsgCtx) throws Exception {
  +
  +         
  +
  +
           return new RMMessageContext();
       }
   
  
  
  
  1.35      +5 -4      
ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java
  
  Index: RMProvider.java
  ===================================================================
  RCS file: 
/home/cvs/ws-fx/sandesha/src/org/apache/sandesha/ws/rm/providers/RMProvider.java,v
  retrieving revision 1.34
  retrieving revision 1.35
  diff -u -r1.34 -r1.35
  --- RMProvider.java   21 Feb 2005 12:08:21 -0000      1.34
  +++ RMProvider.java   25 Feb 2005 13:41:47 -0000      1.35
  @@ -23,10 +23,7 @@
   import org.apache.axis.message.addressing.AddressingHeaders;
   import org.apache.axis.providers.java.RPCProvider;
   import org.apache.commons.logging.Log;
  -import org.apache.sandesha.IStorageManager;
  -import org.apache.sandesha.RMException;
  -import org.apache.sandesha.RMInitiator;
  -import org.apache.sandesha.RMMessageContext;
  +import org.apache.sandesha.*;
   import org.apache.sandesha.server.MessageValidator;
   import org.apache.sandesha.server.RMMessageProcessorIdentifier;
   import org.apache.sandesha.server.msgprocessors.FaultProcessor;
  @@ -92,6 +89,10 @@
           try {
               if (!rmMessageProcessor.processMessage(rmMessageContext)) {
                   msgContext.setResponseMessage(null);
  +            }else{
  +                // TODO Get the from envecreator
  +                
  +                // SOAPEnvelope 
resEn=EnvelopeCreator.createAcknowledgementEnvelope()
               }
            } catch (AxisFault af) {
                     RMProvider.log.error(af);
  
  
  

Reply via email to