Author: fhanik
Date: Tue Feb 28 10:47:46 2006
New Revision: 381740

URL: http://svn.apache.org/viewcvs?rev=381740&view=rev
Log:
Completed the order protocol

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?rev=381740&r1=381739&r2=381740&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 Tue Feb 28 10:47:46 2006
@@ -52,11 +52,25 @@
         int msgnr = 
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
         msg.getMessage().trim(4);
         MessageOrder order = new MessageOrder(msgnr,msg);
-        processIncoming(order);
+        if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
         //getPrevious().messageReceived(msg);
     }
-    public synchronized void processIncoming(MessageOrder order) {
-int val = order.getMsgNr();
+    
+    public synchronized void processLeftOvers(Member member, boolean force) {
+        MessageOrder tmp = (MessageOrder)incoming.get(member);
+        if ( force ) {
+            Counter cnt = getInCounter(member);
+            cnt.setCounter(Integer.MAX_VALUE);
+        }
+        if ( tmp!= null ) processIncoming(tmp);
+    }
+    /**
+     * 
+     * @param order MessageOrder
+     * @return boolean - true if a message expired and was processed
+     */
+    public synchronized boolean processIncoming(MessageOrder order) {
+        boolean result = false;
         Member member = order.getMessage().getAddress();
         Counter cnt = getInCounter(member);
         
@@ -86,10 +100,12 @@
             if ( tmp.isExpired(expire) ) {
                 //reset the head
                 if ( tmp == head ) head = tmp.next;
+                cnt.setCounter(tmp.getMsgNr()+1);
                 if ( getForwardExpired() ) 
super.messageReceived(tmp.getMessage());
                 tmp.setMessage(null);
                 tmp = tmp.next;
-                if ( prev != null ) prev.next = tmp;                
+                if ( prev != null ) prev.next = tmp;  
+                result = true;
             } else {
                 prev = tmp;
                 tmp = tmp.next;
@@ -97,6 +113,7 @@
         }
         if ( head == null ) incoming.remove(member);
         else incoming.put(member, head);
+        return result;
     }
     
     public void memberAdded(Member member) {
@@ -110,6 +127,8 @@
         //notify upwards
         outcounter.remove(member);
         incounter.remove(member);
+        //clear the remaining queue
+        processLeftOvers(member,true);
         super.memberDisappeared(member);
     }
     



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to