jaliya      2005/02/16 00:04:33

  Added:       sandesha/src/org/apache/sandesha/storage/queue
                        QueueException.java ResponseSequenceHash.java
                        SandeshaQueue.java SequenceHash.java
  Log:
  Refactored the code, Manly change the package structure for the Storage
  
  Revision  Changes    Path
  1.1                  
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/QueueException.java
  
  Index: QueueException.java
  ===================================================================
  /*
   * Copyright  1999-2004 The Apache Software Foundation.
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   *
   */
  package org.apache.sandesha.storage.queue;
  
  /**
   * @author Chamikara Jayalath
   * @author Jaliya Ekanayaka
   */
  
  public class QueueException extends Exception {
  
      public QueueException(String msg) {
          super(msg);
  
      }
  }
  
  
  1.1                  
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/ResponseSequenceHash.java
  
  Index: ResponseSequenceHash.java
  ===================================================================
  /*
   * Copyright  1999-2004 The Apache Software Foundation.
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   *
   */
  
  package org.apache.sandesha.storage.queue;
  
  import org.apache.sandesha.Constants;
  import org.apache.sandesha.RMMessageContext;
  
  import java.util.*;
  
  
  /*
   * Created on Aug 4, 2004 at 5:08:29 PM
   */
  
  /**
   * @author Chamikara Jayalath
   * @author Jaliya Ekanayaka
   */
  
  /**
   * This class works as a hash map for storing response messages until they are
   * sent.
   */
  public class ResponseSequenceHash {
  
      //following concepts was removed from highPriorityQueue.
      //This was to reduce complexity. (Since here time is also taken into
      // account)
      //private long lastProcessed;
      //private boolean hasMessagesToSend;
  
      private String sequenceId;
  
      private String outSequenceId;
  
      private boolean outSeqApproved;
  
      private HashMap hash;
  
      private Vector markedAsDelete;
  
      private long nextAutoNumber; // key for storing messages.
      //--> USING AUTONUMBER FOR MESSAGENUMBERS
      // private long nextMessageNumber;
      
      public ResponseSequenceHash(String sequenceId) {
          //lastProcessed = 0;
          //hasMessagesToSend = false;
          this.sequenceId = sequenceId;
          hash = new HashMap();
          markedAsDelete = new Vector();
          nextAutoNumber = 1; //This is the key for storing messages.
          outSeqApproved = false;
      }
  
      /*
       * public boolean hasMessagesToSend(){ return hasMessagesToSend; }
       */
  
      public String getSequenceId() {
          return sequenceId;
      }
  
      public void setSequenceId(String sequenceId) {
          this.sequenceId = sequenceId;
      }
  
      public boolean isOutSeqApproved() {
          return outSeqApproved;
      }
  
      public void setOutSeqApproved(boolean b) {
          outSeqApproved = b;
      }
  
      public String getOutSequenceId() {
          return outSequenceId;
      }
  
      public void setOutSequenceId(String string) {
          outSequenceId = string;
      }
  
      /**
       * adds the message to map.
       */
      public Object putNewMessage(RMMessageContext msg) {
          Long key = new Long(nextAutoNumber);
          Object obj = hash.put(key, msg);
          increaseAutoNo();
          //refreshHasProcessableMessages();
          return obj;
      }
  
      /**
       * Increases auto number by 1.
       */
      private void increaseAutoNo() {
          nextAutoNumber++;
      }
  
      /**
       * Removes a message from the hash map.
       */
      public boolean removeMessage(long id) {
          //TODO: Add messageremoving code if needed.
          boolean removed = false;
  
          Long key = new Long(id);
          Object obj = hash.remove(key);
  
          if (obj != null)
              removed = true;
  
          return removed;
      }
  
      /**
       * Returns the key of the next message to be sent.
       */
      /*
       * public long getNextMessageKeyToSend(){
       * 
       * long id = lastProcessed+1;
       * 
       * return id; }
       */
  
      /**
       * Returns the next deliverable message if has any. Otherwise returns 
null.
       */
      public RMMessageContext getNextMessageToSend() {
          //Long nextKey = new Long (lastProcessed+1);
          //RMMessageContext msg = (RMMessageContext) hash.get(nextKey);
  
          //RMMessageContext msg = null;
          RMMessageContext minMsg = null;
          Iterator keys = hash.keySet().iterator();
  
          whileLoop: while (keys.hasNext()) {
              RMMessageContext tempMsg;
              tempMsg = (RMMessageContext) hash.get(keys.next());
              
              //EDITED FOR MARKASDELETE
              Long msgNo = new Long(tempMsg.getMsgNumber());
              if (markedAsDelete.contains(msgNo)) {
                  System.out.println("mark as delete contains " + msgNo);
                  continue;
              }
              //System.out.println("mark as delete does not contains "+msgNo);
              
              //END - EDITED FOR MARKASDELETE
              
              long lastSentTime = tempMsg.getLastSentTime();
              //System.out.println("last sent time: "+lastSentTime);
              Date d = new Date();
              long currentTime = d.getTime();
              //System.out.println("current time: "+currentTime);
              // System.out.println("difference: "+(currentTime-lastSentTime));
              if (currentTime >= lastSentTime + 
Constants.RETRANSMISSION_INTERVAL) {
                  if (minMsg == null)
                      minMsg = tempMsg;
                  else {
                      long msgNo1, msgNo2;
                      msgNo1 = tempMsg.getMsgNumber();
                      msgNo2 = minMsg.getMsgNumber();
                      if (msgNo1 < msgNo2)
                          minMsg = tempMsg;
                  }
              }
          }
  
          Date d = new Date();
          long time = d.getTime();
          if (minMsg != null) {
              minMsg.setLastSentTime(time);
          }
  
          return minMsg;
      }
  
      /**
       * Gives all the deliverable messages of this sequence. Resturns a vector.
       */
      /*
       * public Vector getNextMessagesToSend(){
       * 
       * boolean done = false; Vector messages = new Vector();
       * 
       * while(!done){ Long nextKey = new Long(lastProcessed+1); Object obj =
       * hash.get(nextKey); if(obj!=null){ messages.add(obj);
       * incrementProcessedCount(); }else{ done=true; //To exit the loop. } }
       * refreshHasProcessableMessages();
       * 
       * return messages; }
       */
  
      /*
       * private void incrementProcessedCount(){ lastProcessed++; }
       */
  
      /*
       * private void refreshHasProcessableMessages(){ Long nextKey = new
       * Long(lastProcessed+1); hasMessagesToSend = hash.containsKey(nextKey); }
       */
  
      public boolean hasMessage(Long key) {
          Object obj = hash.get(key);
  
          return (!(obj == null));
      }
  
      public void clearSequence(boolean yes) {
          if (!yes)
              return;
  
          hash.clear();
          //lastProcessed = 0;
          //hasMessagesToSend = false;
          nextAutoNumber = 1;
          outSeqApproved = false;
          outSequenceId = null;
          sequenceId = null;
      }
  
      public Set getAllKeys() {
          return hash.keySet();
      }
  
      public String getMessageId(Long key) {
          RMMessageContext msg = (RMMessageContext) hash.get(key);
          if (msg == null)
              return null;
  
          return msg.getMessageID();
  
      }
  
      //Deleting returns the deleted message.
      public RMMessageContext deleteMessage(Long msgId) {
          RMMessageContext msg = (RMMessageContext) hash.get(msgId);
  
          if (msg == null)
              return null;
  
          hash.remove(msgId);
          return msg;
      }
  
      public boolean markMessageDeleted(Long messageNo) {
          if (hash.containsKey(messageNo)) {
              markedAsDelete.add(messageNo);
              String msgId = ((RMMessageContext) 
hash.get(messageNo)).getMessageID();
              System.out.println("INFO: Marking outgoing message deleted : 
msgId "
                      + msgId);
              return true;
          }
          return false;
      }
  
      public long nextMessageNumber() {
          return nextAutoNumber;
      }
  
      public boolean isMessagePresent(String msgId) {
          boolean b = false;
  
          b = hash.containsKey(msgId);
          return b;
      }
  
      public boolean hasMessageWithId(String msgId) {
          //boolean b = false;
          
          Iterator it = hash.keySet().iterator();
          while (it.hasNext()) {
  
              RMMessageContext msg = (RMMessageContext) hash.get(it.next());
              if (msg.getMessageID().equals(msgId))
                  return true;
  
          }
          return false;
      }
  
      public Vector getReceivedMsgNumbers() {
  
          Vector result = new Vector();
          Iterator it = hash.keySet().iterator();
  
          while (it.hasNext()) {
              Object key = it.next();
              RMMessageContext msg = (RMMessageContext) hash.get(key);
              long l = msg.getMsgNumber();
              result.add(new Long(l));
          }
  
          return result;
      }
  
      public void setResponseReceived(String msgID) {
          Iterator it = hash.keySet().iterator();
  
          while (it.hasNext()) {
              RMMessageContext msg = (RMMessageContext) hash.get(it.next());
              if (msg.getMessageID().equals(msgID))
                  msg.setResponseReceived(true);
          }
      }
  
      public void setAckReceived(String msgID) {
          Iterator it = hash.keySet().iterator();
  
          while (it.hasNext()) {
              RMMessageContext msg = (RMMessageContext) hash.get(it.next());
              if (msg.getMessageID().equals(msgID))
                  msg.setAckReceived(true);
          }
      }
  
      public void setAckReceived(long msgNo) {
  
          RMMessageContext msg = (RMMessageContext) hash.get(new Long(msgNo));
          if (msg != null) {
              msg.setAckReceived(true);
          } else
              System.out.println("ERROR: MESSAGE IS NULL IN ResponseSeqHash");
  
      }
  
      public boolean isAckComplete() {
          try {
              long lastMsgNo = getLastMessage();
              if (lastMsgNo <= 0) {
                  return false;
              }
              Iterator it = hash.keySet().iterator();
              for (long i = 1; i < lastMsgNo; i++) {
                  if (!hasMessage(new Long(i))) {
                      return false;
                  }
              }
  
              it = hash.keySet().iterator();
              while (it.hasNext()) {
                  RMMessageContext msg = (RMMessageContext) hash.get(it.next());
                  if (!msg.isAckReceived()) {
                      return false;
                  }
              }
  
              return true;
          } catch (Exception e) {
              e.printStackTrace();
              return false;
          }
      }
  
      private long getLastMessage() {
          Iterator it = hash.keySet().iterator();
          while (it.hasNext()) {
              RMMessageContext msg = (RMMessageContext) hash.get(it.next());
              if (msg.isLastMessage()) {
                  return msg.getMsgNumber();
              }
          }
  
          return -1;
      }
  
  }
  
  
  1.1                  
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SandeshaQueue.java
  
  Index: SandeshaQueue.java
  ===================================================================
  /*
   * Copyright  1999-2004 The Apache Software Foundation.
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   *
   */
  
  package org.apache.sandesha.storage.queue;
  
  
  import org.apache.sandesha.Constants;
  import org.apache.sandesha.RMMessageContext;
  import org.apache.sandesha.ws.rm.AcknowledgementRange;
  import org.apache.sandesha.ws.rm.SequenceAcknowledgement;
  
  import java.util.*;
  
  /*
   * Created on Aug 4, 2004 at 4:49:49 PM
   */
  
  /**
   * @author Chamikara Jayalath
   * @author Jaliya Ekanayaka
   */
  
  public class SandeshaQueue {
  
      private static SandeshaQueue queue = null;
  
      HashMap incomingMap; //In comming messages.
  
      HashMap outgoingMap; //Response messages
  
      ArrayList highPriorityQueue; // Acks and create seq. responses.
  
      HashMap queueBin; // Messaged processed from out queue will be moved
  
      ArrayList lowPriorityQueue;
      // to this.
  
      // to this.
  
      private SandeshaQueue() {
          incomingMap = new HashMap();
          outgoingMap = new HashMap();
          highPriorityQueue = new ArrayList();
          queueBin = new HashMap();
          lowPriorityQueue = new ArrayList();
      }
  
      public static SandeshaQueue getInstance() {
          if (queue == null) {
              queue = new SandeshaQueue();
          }
          return queue;
      }
  
      /**
       * This will not replace messages automatically.
       */
      public boolean addMessageToIncomingSequence(String seqId, Long messageNo, 
RMMessageContext msgCon) throws QueueException {
          boolean successful = false;
  
          if (seqId == null || msgCon == null)
              throw new QueueException("Error in adding message");
  
          if (isIncomingSequenceExists(seqId)) {
              SequenceHash seqHash = (SequenceHash) incomingMap.get(seqId);
  
              synchronized (seqHash) {
  
                  if (seqHash == null)
                      throw new QueueException("Inconsistent queue");
  
                  if (seqHash.hasMessage(messageNo))
                      throw new QueueException("Message already exists");
                  //Messages will not be replaced automatically.
  
                  seqHash.putNewMessage(messageNo, msgCon);
              }
          }
  
          return successful;
      }
  
      /**
       *  
       */
      public boolean addMessageToOutgoingSequence(String seqId,
                                                  RMMessageContext msgCon) 
throws QueueException {
          boolean successful = false;
  
          if (seqId == null || msgCon == null)
              throw new QueueException("Error in adding message");
  
          if (isOutgoingSequenceExists(seqId)) {
              ResponseSequenceHash resSeqHash = (ResponseSequenceHash) 
outgoingMap
                      .get(seqId);
  
              synchronized (resSeqHash) {
  
                  if (resSeqHash == null)
                      throw new QueueException("Inconsistent queue");
                  resSeqHash.putNewMessage(msgCon);
              }
          }
  
          return successful;
      }
  
      public boolean messagePresentInIncomingSequence(String sequenceId,
                                                      Long messageNo) throws 
QueueException {
  
          SequenceHash seqHash = (SequenceHash) incomingMap.get(sequenceId);
  
          if (seqHash == null)
              throw new QueueException("Sequence not present");
  
          synchronized (seqHash) {
              return seqHash.hasMessage(messageNo);
          }
      }
  
      public boolean isIncomingSequenceExists(String seqId) {
  
          synchronized (incomingMap) {
  
              return incomingMap.containsKey(seqId);
          }
      }
  
      public boolean isOutgoingSequenceExists(String resSeqId) {
  
          synchronized (outgoingMap) {
  
              return outgoingMap.containsKey(resSeqId);
          }
      }
  
      public String nextIncomingSequenceIdToProcess() {
  
          synchronized (incomingMap) {
  
              int count = incomingMap.size();
              Iterator it = incomingMap.keySet().iterator();
              SequenceHash sh = null;
              String seqId = null;
  
              whileLoop: while (it.hasNext()) {
                  String tempSeqId = (String) it.next();
                  sh = (SequenceHash) incomingMap.get(tempSeqId);
                  if (sh.hasProcessableMessages()) {
                      seqId = tempSeqId;
                      break whileLoop;
                  }
              }
  
              return seqId;
          }
      }
  
      public RMMessageContext nextIncomingMessageToProcess(String sequenceId)
              throws QueueException {
  
          if (sequenceId == null)
              return null;
  
          SequenceHash sh = (SequenceHash) incomingMap.get(sequenceId);
  
          synchronized (sh) {
  
              if (sh == null)
                  throw new QueueException("Sequence id does not exist");
  
              if (!sh.hasProcessableMessages())
                  return null;
  
              RMMessageContext msgCon = sh.getNextMessageToProcess();
              return msgCon;
  
          }
      }
  
      public RMMessageContext nextOutgoingMessageToSend() throws QueueException 
{
  
          RMMessageContext msg = null;
  
          synchronized (outgoingMap) {
  
              Iterator it = outgoingMap.keySet().iterator();
  
              whileLoop: while (it.hasNext()) {
                  RMMessageContext tempMsg;
                  String tempKey = (String) it.next();
                  ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                          .get(tempKey);
                  if (rsh.isOutSeqApproved()) {
                      tempMsg = rsh.getNextMessageToSend();
                      if (tempMsg != null) {
                          msg = tempMsg;
                          msg.setSequenceID(rsh.getOutSequenceId());
                          msg.setOldSequenceID(rsh.getSequenceId());
                          break whileLoop;
                      }
                  }
              }
          }
          return msg;
      }
  
      public void createNewIncomingSequence(String sequenceId)
              throws QueueException {
          if (sequenceId == null)
              throw new QueueException("Sequence Id is null");
  
          synchronized (incomingMap) {
  
              SequenceHash sh = new SequenceHash(sequenceId);
              incomingMap.put(sequenceId, sh);
  
          }
      }
  
      public void createNewOutgoingSequence(String sequenceId)
              throws QueueException {
          if (sequenceId == null)
              throw new QueueException("Sequence Id is null");
  
          synchronized (outgoingMap) {
  
              ResponseSequenceHash rsh = new ResponseSequenceHash(sequenceId);
              outgoingMap.put(sequenceId, rsh);
          }
  
      }
  
      /**
       * Adds a new message to the responses queue.
       */
      public void addPriorityMessage(RMMessageContext msg) throws 
QueueException {
  
          synchronized (highPriorityQueue) {
  
              if (msg == null)
                  throw new QueueException("Message is null");
  
              highPriorityQueue.add(msg);
          }
      }
  
  
      public void addLowPriorityMessage(RMMessageContext msg) throws 
QueueException {
  
          synchronized (lowPriorityQueue) {
  
              if (msg == null)
                  throw new QueueException("Message is null");
  
              lowPriorityQueue.add(msg);
          }
      }
  
  
      public RMMessageContext nextPriorityMessageToSend() throws QueueException 
{
  
          synchronized (highPriorityQueue) {
  
              if (highPriorityQueue.size() <= 0)
                  return null;
  
              //RMMessageContext msg = (RMMessageContext) 
highPriorityQueue.get(0);
              RMMessageContext msg = null;
              int size = highPriorityQueue.size();
  
              synchronized (highPriorityQueue) {
  
                  forLoop: //Label
                  for (int i = 0; i < size; i++) {
                      RMMessageContext tempMsg = (RMMessageContext) 
highPriorityQueue
                              .get(i);
                      if (tempMsg != null) {
  
                          switch (tempMsg.getMessageType()) {
                              //Create seq messages will not be removed.
                              case Constants.MSG_TYPE_CREATE_SEQUENCE_REQUEST:
                                  long lastSentTime = tempMsg.getLastSentTime();
                                  Date d = new Date();
                                  long currentTime = d.getTime();
                                  if (currentTime >= lastSentTime
                                          + Constants.RETRANSMISSION_INTERVAL) {
                                      tempMsg.setLastSentTime(currentTime);
                                      msg = tempMsg;
                                      break forLoop;
                                  }
                                  break;
  
                                  //Other msgs will be removed.
                                  //These include CreareSeqResponses and
                                  // Acknowledgements.
                              default:
                                  highPriorityQueue.remove(i);
                                  queueBin.put(tempMsg.getMessageID(), tempMsg);
                                  msg = tempMsg;
                                  break forLoop;
                          }
  
                      }
                  }
              }
  
              return msg;
  
          }
      }
      
      
  
      /*
       * public RMMessageContext getNextToProcessIfHasNew(String sequenceId){
       * SequenceHash sh = (SequenceHash) incomingMap.get(sequenceId);
       * if(sh==null) return null;
       * 
       * synchronized (sh) { if(!sh.hasNewMessages()) return null;
       * 
       * Long key = sh. } }
       */
  
      public Vector nextAllMessagesToProcess(String sequenceId)
              throws QueueException {
          SequenceHash sh = (SequenceHash) incomingMap.get(sequenceId);
  
          synchronized (sh) {
              Vector v = sh.getNextMessagesToProcess();
              return v;
          }
      }
  
      //Folowing func. may cause errors.
      /*
       * public Vector nextAllResponseMessagesToSend(String sequenceId) throws
       * QueueException{ ResponseSequenceHash rsh = (ResponseSequenceHash)
       * outgoingMap.get(sequenceId); Vector v = new Vector(); synchronized 
(rsh){
       * RMMessageContext msg = nextAllResponseMessagesToSend()
       * 
       * while(msg!=null){ v.add(msg); msg = rsh.getNextMessageToSend(); } 
return
       * v; } }
       */
  
      public Vector nextAllSeqIdsToProcess() {
          Vector ids = new Vector();
  
          synchronized (incomingMap) {
              Iterator it = incomingMap.keySet().iterator();
  
              while (it.hasNext()) {
                  Object tempKey = it.next();
                  SequenceHash sh = (SequenceHash) incomingMap.get(tempKey);
                  if (sh.hasProcessableMessages() && !sh.isSequenceLocked())
                      ids.add(sh.getSequenceId());
              }
              return ids;
          }
      }
  
      /*
       * public Vector nextAllResponseSeqIdsToSend(){ Vector ids = new Vector();
       * 
       * synchronized (outgoingMap){ Iterator it =
       * outgoingMap.keySet().iterator();
       * 
       * while(it.hasNext()){ Object tempKey = it.next(); ResponseSequenceHash 
sh =
       * (ResponseSequenceHash) outgoingMap.get(tempKey);
       * if(sh.hasProcessableMessages()) ids.add(sh.getSequenceId()); } } return
       * ids; }
       */
  
      public void clear(boolean yes) {
          if (!yes)
              return;
  
          incomingMap.clear();
          highPriorityQueue.clear();
          outgoingMap.clear();
          queueBin.clear();
      }
  
      public void removeAllMsgsFromIncomingSeqence(String seqId, boolean yes) {
          if (!yes)
              return;
  
          SequenceHash sh = (SequenceHash) incomingMap.get(seqId);
          sh.clearSequence(yes);
      }
  
      public void removeAllMsgsFromOutgoingSeqence(String seqId, boolean yes) {
          if (!yes)
              return;
  
          ResponseSequenceHash sh = (ResponseSequenceHash) 
outgoingMap.get(seqId);
          sh.clearSequence(yes);
      }
  
      public void removeIncomingSequence(String sequenceId, boolean yes) {
          if (!yes)
              return;
  
          incomingMap.remove(sequenceId);
      }
  
      public void removeOutgoingSequence(String sequenceId, boolean yes) {
          if (!yes)
              return;
  
          synchronized (outgoingMap) {
              outgoingMap.remove(sequenceId);
          }
      }
  
      public void setSequenceLock(String sequenceId, boolean lock) {
          SequenceHash sh = (SequenceHash) incomingMap.get(sequenceId);
          sh.setProcessLock(lock);
      }
  
      public Set getAllReceivedMsgNumsOfIncomingSeq(String sequenceId) {
          Vector v = new Vector();
          SequenceHash sh = (SequenceHash) incomingMap.get(sequenceId);
          if (sh != null)
              return sh.getAllKeys();
          else
              return null;
      }
  
      public Set getAllReceivedMsgNumsOfOutgoingSeq(String sequenceId) {
          Vector v = new Vector();
          ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                  .get(sequenceId);
          synchronized (rsh) {
              return rsh.getAllKeys();
          }
      }
  
      public boolean isIncomingMessageExists(String sequenceId, Long messageNo) 
{
          SequenceHash sh = (SequenceHash) incomingMap.get(sequenceId);
          //sh can be null if there are no messages at the initial point.
          if (sh != null)
              return sh.hasMessage(messageNo);
          else
              return false;
      }
  
      public void setOutSequence(String seqId, String outSeqId) {
          ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                  .get(seqId);
  
          if (rsh == null) {
              System.out.println("ERROR: RESPONSE SEQ IS NULL");
              return;
          }
  
          synchronized (rsh) {
              rsh.setOutSequenceId(outSeqId);
          }
      }
  
      public void setOutSequenceApproved(String seqId, boolean approved) {
          ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                  .get(seqId);
  
          if (rsh == null) {
              System.out.println("ERROR: RESPONSE SEQ IS NULL");
              return;
          }
          synchronized (rsh) {
              rsh.setOutSeqApproved(approved);
          }
      }
  
      public String getSequenceOfOutSequence(String outSequence) {
  
          if (outSequence == null) {
              return null;
          }
  
          //Client will always handle a single seq
          //if(outSequence==Constants.CLIENT_DEFAULD_SEQUENCE_ID)
          //    return outSequence;
  
          Iterator it = outgoingMap.keySet().iterator();
          synchronized (outgoingMap) {
              while (it.hasNext()) {
  
                  String tempSeqId = (String) it.next();
                  ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                          .get(tempSeqId);
                  String tempOutSequence = rsh.getOutSequenceId();
                  if (outSequence.equals(tempOutSequence))
                      return tempSeqId;
  
              }
          }
          return null;
      }
  
      public void displayOutgoingMap() {
          Iterator it = outgoingMap.keySet().iterator();
          System.out.println("------------------------------------");
          System.out.println("      DISPLAYING RESPONSE MAP");
          System.out.println("------------------------------------");
          while (it.hasNext()) {
              String s = (String) it.next();
              System.out.println("\n Sequence id - " + s);
              ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                      .get(s);
  
              Iterator it1 = rsh.getAllKeys().iterator();
              while (it1.hasNext()) {
                  Long l = (Long) it1.next();
                  String msgId = rsh.getMessageId(l);
                  System.out.println("* key -" + l.longValue() + "- MessageID -"
                          + msgId + "-");
              }
          }
          System.out.println("\n");
      }
  
      public void displayIncomingMap() {
          Iterator it = incomingMap.keySet().iterator();
          System.out.println("------------------------------------");
          System.out.println("       DISPLAYING SEQUENCE MAP");
          System.out.println("------------------------------------");
          while (it.hasNext()) {
              String s = (String) it.next();
              System.out.println("\n Sequence id - " + s);
              SequenceHash sh = (SequenceHash) incomingMap.get(s);
  
              Iterator it1 = sh.getAllKeys().iterator();
              while (it1.hasNext()) {
                  Long l = (Long) it1.next();
                  String msgId = sh.getMessageId(l);
                  System.out.println("* key -" + l.longValue() + "- MessageID -"
                          + msgId + "-");
              }
          }
          System.out.println("\n");
      }
  
      public void displayPriorityQueue() {
  
          System.out.println("------------------------------------");
          System.out.println("       DISPLAYING PRIORITY QUEUE");
          System.out.println("------------------------------------");
  
          Iterator it = highPriorityQueue.iterator();
          while (it.hasNext()) {
              RMMessageContext msg = (RMMessageContext) it.next();
              String id = msg.getMessageID();
              int type = msg.getMessageType();
  
              System.out.println("Message " + id + "  Type " + type);
          }
          System.out.println("\n");
      }
  
      public void moveOutgoingMsgToBin(String sequenceId, Long messageNo) {
          String sequence = getSequenceOfOutSequence(sequenceId);
          ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                  .get(sequence);
  
          if (rsh == null) {
              System.out.println("ERROR: RESPONSE SEQ IS NULL " + sequence);
              return;
          }
  
          synchronized (rsh) {
              //Deleting retuns the deleted message.
              RMMessageContext msg = rsh.deleteMessage(messageNo);
              //If we jave already deleted then no message to return.
              if (msg != null) {
  
                  String msgId = msg.getMessageID();
                  System.out
                          .println("INFO: Moving out going messages to bin : 
msgId "
                          + msgId);
                  //Add msg to bin if id isnt null.
                  if (msgId != null)
                      queueBin.put(msgId, msg);
              }
  
          }
      }
  
      public void markOutgoingMessageToDelete(String sequenceId, Long 
messageNo) {
          String sequence = getSequenceOfOutSequence(sequenceId);
          ResponseSequenceHash rsh = (ResponseSequenceHash) outgoingMap
                  .get(sequence);
  
          if (rsh == null) {
              System.out.println("ERROR: RESPONSE SEQ IS NULL " + sequence);
              return;
          }
  
          synchronized (rsh) {
              //Deleting retuns the deleted message.
              rsh.markMessageDeleted(messageNo);
              //If we jave already deleted then no message to return.
          }
  
      }
  
  
      public void movePriorityMsgToBin(String messageId) {
          synchronized (highPriorityQueue) {
              int size = highPriorityQueue.size();
              for (int i = 0; i < size; i++) {
                  RMMessageContext msg = (RMMessageContext) 
highPriorityQueue.get(i);
  
                  if (msg.getMessageID().equals(messageId)) {
  
                      highPriorityQueue.remove(i);
                      queueBin.put(messageId, msg);
                      return;
                  }
              }
          }
      }
  
      public long getNextOutgoingMessageNumber(String seq) {
          ResponseSequenceHash rsh = (ResponseSequenceHash) 
outgoingMap.get(seq);
  
          if (rsh == null) { //saquence not created yet.
              try {
                  createNewOutgoingSequence(seq);
              } catch (QueueException q) {
                  System.out.println(q.getStackTrace());
              }
          }
          rsh = (ResponseSequenceHash) outgoingMap.get(seq);
          synchronized (rsh) {
              Iterator keys = rsh.getAllKeys().iterator();
  
              long msgNo = rsh.nextMessageNumber();
  
              /* while (keys.hasNext()) {
                   System.out.println("HAS KEYS");
                   long temp = ((Long) keys.next()).longValue();
                   System.out.println("TEMP IS "+temp);
                   if (temp > msgNo)
                       msgNo = temp;
               }
  
               msgNo++;*/
               
              System.out.println("RETURNING MSG NUMBER " + msgNo);
              return (msgNo);
          }
      }
  
      public RMMessageContext checkForResponseMessage(String requestId, String 
seqId) {
          SequenceHash sh = (SequenceHash) incomingMap.get(seqId);
          System.out.println("DEFAULT : " + requestId + " SEQ " + seqId);
  
          if (sh == null) {
              System.out.println("ERROR: SEQ IS NULL");
              return null;
          }
  
          synchronized (sh) {
              RMMessageContext msg = sh.getMessageRelatingTo(requestId);
              return msg;
          }
      }
  
      public boolean isRequestMsgPresent(String seqId, String messageId) {
          ResponseSequenceHash rsh = (ResponseSequenceHash) 
outgoingMap.get(seqId);
  
          if (rsh == null) {
              System.out.println("ERROR: SEQ IS NULL");
              return false;
          }
  
          boolean present = false;
          synchronized (rsh) {
              present = rsh.isMessagePresent(messageId);
          }
  
          return present;
  
      }
  
      public String searchForSequenceId(String messageId) {
          Iterator it = outgoingMap.keySet().iterator();
  
          String key = null;
          while (it.hasNext()) {
              key = (String) it.next();
              Object obj = outgoingMap.get(key);
              if (obj != null) {
                  ResponseSequenceHash hash = (ResponseSequenceHash) obj;
                  boolean hasMsg = hash.hasMessageWithId(messageId);
                  if (!hasMsg)
                      key = null;
              }
  
          }
  
          return key;
      }
  
      public Vector getAllAckedMsgNumbers(String seqId) {
          Vector msgNumbers = new Vector();
  
          Iterator it = highPriorityQueue.iterator();
          while (it.hasNext()) {
              RMMessageContext msg = (RMMessageContext) it.next();
              if (msg.getMessageType() != Constants.MSG_TYPE_ACKNOWLEDGEMENT)
                  continue;
  
              SequenceAcknowledgement seqAck = 
msg.getRMHeaders().getSequenceAcknowledgement();
              String sId = seqAck.getIdentifier().getIdentifier();
              if (seqId != sId)    //Sorry. Wrong sequence.
                  continue;
  
              List ackList = seqAck.getAckRanges();
  
              Iterator ackIt = ackList.iterator();
  
              while (ackIt.hasNext()) {
                  AcknowledgementRange ackRng = (AcknowledgementRange) 
ackIt.next();
                  long min = ackRng.getMinValue();
                  long temp = min;
                  while (temp <= ackRng.getMaxValue()) {
                      Long lng = new Long(temp);
                      if (!msgNumbers.contains(lng))  //vector cant hv 
duplicate entries.
                          msgNumbers.add(new Long(temp));
  
                      temp++;
                  }
              }
          }
          return msgNumbers;
      }
  
      public Vector getAllOutgoingMsgNumbers(String seqId) {
          Vector msgNumbers = new Vector();
  
          ResponseSequenceHash rsh = (ResponseSequenceHash) 
outgoingMap.get(seqId);
  
          if (rsh == null) {
              System.out.println("ERROR: SEQ IS NULL " + seqId);
              return msgNumbers;
          }
  
          synchronized (rsh) {
              msgNumbers = rsh.getReceivedMsgNumbers();
          }
  
          return msgNumbers;
      }
  
      public void setResponseReceived(RMMessageContext responseMsg) {
          String requestMsgID = 
responseMsg.getAddressingHeaders().getRelatesTo().toString();
  
          Iterator it = outgoingMap.keySet().iterator();
  
          String key = null;
          while (it.hasNext()) {
              key = (String) it.next();
              Object obj = outgoingMap.get(key);
              if (obj != null) {
                  ResponseSequenceHash hash = (ResponseSequenceHash) obj;
                  boolean hasMsg = hash.hasMessageWithId(requestMsgID);
                  if (!hasMsg)
                  //set the property response received
                      hash.setResponseReceived(requestMsgID);
              }
          }
  
      }
  
      public void setAckReceived(String seqId, long msgNo) {
  
          Iterator it = outgoingMap.keySet().iterator();
  
          String key = null;
          while (it.hasNext()) {
              key = (String) it.next();
              Object obj = outgoingMap.get(key);
  
  
              if (obj != null) {
                  ResponseSequenceHash hash = (ResponseSequenceHash) obj;
                  //System.out.println("************** HASH SEQ IS " + 
hash.getSequenceId() + "      SEQ IS " + seqId + " OUT SEQ IS  " + 
hash.getOutSequenceId());
                  if (hash.getOutSequenceId().equals(seqId)) {
  
                      hash.setAckReceived(msgNo);
                  }
              }
          }
  
      }
  
  
      public Vector getAllIncommingMsgNumbers(String seqId) {
          Vector msgNumbers = new Vector();
          incomingMap.get(seqId);
          
          //Not implemented yet.
          return msgNumbers;
      }
  
      public RMMessageContext getLowPriorityMessageIfAcked() {
  
          int size = lowPriorityQueue.size();
  
          RMMessageContext terminateMsg = null;
          for (int i = 0; i < size; i++) {
  
              RMMessageContext temp;
              temp = (RMMessageContext) lowPriorityQueue.get(i);
              String seqId = temp.getSequenceID();
              // System.out.println(" HASH NOT FOUND SEQ ID " + seqId);
  
              ResponseSequenceHash hash = null;
              hash = (ResponseSequenceHash) outgoingMap.get(seqId);
              if (hash == null) {
                  System.out.println("SandeshaQueue: ERROR: HASH NOT FOUND SEQ 
ID " + seqId);
              }
  
              /*Iterator it1 = outgoingMap.keySet().iterator();
              while(it1.hasNext()){
                  hash = (ResponseSequenceHash) it1.next();
                  if(hash.getOutSequenceId().equals(seqId)){
                      System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 
FOUND SEQ "+ seqId);
                      foundSeq = true;
                      break;
                  }
              } */
  
              if (hash != null) {
                  boolean complete = hash.isAckComplete();
                  if (complete)
                  //lowPriorityQueue.remove(i);
                      terminateMsg = temp;
                  if (terminateMsg != null) {
                      terminateMsg.setSequenceID(hash.getOutSequenceId());
                      lowPriorityQueue.remove(i);
                      break;
                  }
              }
  
          }
  
          return terminateMsg;
      }
  }
  
  
  
  
  1.1                  
ws-fx/sandesha/src/org/apache/sandesha/storage/queue/SequenceHash.java
  
  Index: SequenceHash.java
  ===================================================================
  /*
   * Copyright  1999-2004 The Apache Software Foundation.
   *
   *  Licensed under the Apache License, Version 2.0 (the "License");
   *  you may not use this file except in compliance with the License.
   *  You may obtain a copy of the License at
   *
   *      http://www.apache.org/licenses/LICENSE-2.0
   *
   *  Unless required by applicable law or agreed to in writing, software
   *  distributed under the License is distributed on an "AS IS" BASIS,
   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   *  See the License for the specific language governing permissions and
   *  limitations under the License.
   *
   */
  
  package org.apache.sandesha.storage.queue;
  
  
  import org.apache.axis.message.addressing.RelatesTo;
  import org.apache.sandesha.RMMessageContext;
  
  import java.util.*;
  
  /*
   * Created on Aug 4, 2004 at 5:08:29 PM
   */
  
  /**
   * @author Chamikara Jayalath
   * @author Jaliya Ekanayaka
   */
  
  public class SequenceHash {
  
      private long lastProcessed;
  
      private boolean hasProcessableMessages;
  
      private String sequenceId;
  
      //private String outSequenceId;
      private HashMap hash;
  
      private boolean beingProcessedLock = false; //When true messages are
  
      // currently being processed.
  
      public SequenceHash(String sequenceId) {
          lastProcessed = 0;
          //cacheBottom = 1;
          hasProcessableMessages = false;
          this.sequenceId = sequenceId;
          hash = new HashMap();
      }
  
      public boolean hasProcessableMessages() {
          return hasProcessableMessages;
      }
  
      public String getSequenceId() {
          return sequenceId;
      }
  
      public void setSequenceId(String sequenceId) {
          this.sequenceId = sequenceId;
      }
  
      /**
       * adds the message to map. Also adds a record to cache if needed.
       */
      public Object putNewMessage(Long key, RMMessageContext value) {
  
          Object obj = hash.put(key, value);
          //addToCacheIfNeeded(key);
          refreshHasProcessableMessages();
          return obj;
      }
  
      public boolean removeMessage(long msgId) {
          //TODO: Add messageremoving code if needed.
          boolean removed = false;
  
          Long key = new Long(msgId);
          Object obj = hash.remove(key);
  
          if (obj != null)
              removed = true;
  
          return removed;
      }
  
      public long getNextMessageIdToProcess() {
  
          long id = lastProcessed + 1;
  
          return id;
      }
  
      public RMMessageContext getNextMessageToProcess() {
          Long nextKey = new Long(lastProcessed + 1);
          RMMessageContext msg = (RMMessageContext) hash.get(nextKey);
          if (msg != null) {
              incrementProcessedCount();
              refreshHasProcessableMessages();
          } else {
              setProcessLock(false); // Not a must. (det done in
              // refreshHasProcessableMessages();)
          }
  
          return msg;
  
      }
  
      public Vector getNextMessagesToProcess() {
  
          boolean done = false;
          Vector messages = new Vector();
  
          while (!done) {
              Long nextKey = new Long(lastProcessed + 1);
              Object obj = hash.get(nextKey);
              if (obj != null) {
                  messages.add(obj);
                  incrementProcessedCount();
              } else {
                  setProcessLock(false);
                  done = true; //To exit the loop.
              }
          }
          refreshHasProcessableMessages();
  
          return messages;
      }
  
      private void incrementProcessedCount() {
          lastProcessed++;
      }
  
      private void refreshHasProcessableMessages() {
          Long nextKey = new Long(lastProcessed + 1);
          hasProcessableMessages = hash.containsKey(nextKey);
  
          if (!hasProcessableMessages) //Cant be being procesed if no messages 
to
          // process.
              setProcessLock(false);
      }
  
      public boolean hasMessage(Long msgId) {
          Object obj = hash.get(msgId);
  
          return (!(obj == null));
      }
  
      public void clearSequence(boolean yes) {
          if (!yes)
              return;
  
          hash.clear();
          lastProcessed = 0;
          hasProcessableMessages = false;
      }
  
      public Set getAllKeys() {
          return hash.keySet();
      }
  
      public void setProcessLock(boolean lock) {
          beingProcessedLock = lock;
      }
  
      public boolean isSequenceLocked() {
          return beingProcessedLock;
      }
  
      public String getMessageId(Long key) {
          RMMessageContext msg = (RMMessageContext) hash.get(key);
          if (msg == null)
              return null;
  
          return msg.getMessageID();
  
      }
  
      //Only for client.
      public RMMessageContext getMessageRelatingTo(String relatesTo) {
  
          Iterator it = hash.keySet().iterator();
          RMMessageContext msgToSend = null;
  
          while (it.hasNext()) {
              RMMessageContext msg = (RMMessageContext) hash.get(it.next());
              List lst = msg.getAddressingHeaders().getRelatesTo();
  
              if (lst != null) {
                  RelatesTo rl = (RelatesTo) lst.get(0);
                  String uri = rl.getURI().toString();
                  if (uri.equals(relatesTo))
                      msgToSend = msg;
                  break;
              }
          }
  
          return msgToSend;
          
          /*Long nextKey = new Long(lastProcessed + 1);
          RMMessageContext msg = (RMMessageContext) hash.get(nextKey);
          if (msg != null) {
              incrementProcessedCount();
              refreshHasProcessableMessages();
          } else {
              setProcessLock(false); // Not a must. (det done in
              // refreshHasProcessableMessages();)
          }
  
          return msg;*/
      }
  
      public boolean isMessagePresent(String msgId) {
          boolean b = false;
  
          b = hash.containsKey(msgId);
          return b;
      }
  
      public boolean hasMessageWithId(String msgId) {
          //boolean b = false;
          
          Iterator it = hash.keySet().iterator();
  
          while (it.hasNext()) {
              RMMessageContext msg = (RMMessageContext) it.next();
              if (msg.getMessageID().equals(msgId))
                  return true;
          }
          return false;
      }
  }
  
  

Reply via email to