Author: toad
Date: 2005-10-13 16:20:26 +0000 (Thu, 13 Oct 2005)
New Revision: 7430

Modified:
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
Log:
Mostly done the substance of SubscribeSender.
Now need to re-examine the ties to SubscribeHandler.

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-13 14:27:51 UTC (rev 7429)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-13 16:20:26 UTC (rev 7430)
@@ -25,17 +25,17 @@
     private PeerNode next;
     private int counter;
     
-    // Still running
+    /** Still running */
     static final int SEARCHING = -1;
-    // Found a closer node than the closest-so-far
+    /** Found a closer node than the closest-so-far */
     static final int SUCCESS = 0;
-    // RNF => a) we are the closest node, we become root, or b) we fail and 
return RNF
+    /** RNF => a) we are the closest node, we become root, or b) we fail and 
return RNF. */
     static final int ROUTE_NOT_FOUND = 1;
-    // A node was overloaded
+    /** A node was overloaded */
     static final int REJECTED_OVERLOAD = 2;
-    // An internal error occurred
+    /** An internal error occurred */
     static final int INTERNAL_ERROR = 3;
-    // A node is restarting
+    /** A node further up the tree is restarting its subscription */
     static final int RESTARTING = 4;
 
     static final int INITIAL_TIMEOUT = 5000;
@@ -65,7 +65,7 @@
         this.nearestSoFar = nearestSoFar;
         target = handler.targetLocation();
         node = handler.node;
-        status = -1;
+        status = SEARCHING;
         nodesRoutedTo = new HashSet();
     }
 
@@ -87,7 +87,7 @@
             // Phase 1 (find the node)
             if(status == SEARCHING)
                 runPhase1();
-
+            
             // Phase 2 - wait for change
             if(status == SUCCESS)
                 runPhase2Succeeded();
@@ -132,14 +132,31 @@
             }
             
             if(msg.getSpec() == DMT.FNPRouteNotFound) {
-                
+                // It didn't find anywhere closer than us or someone behind us 
:(
+               Logger.minor(this, msg.toString()+" in runPhase2Restarting");
+               setStatus(SEARCHING);
+               return;
             }
             
-            // TODO Auto-generated method stub
+            if(msg.getSpec() == DMT.FNPSubscribeSucceeded || msg.getSpec() == 
DMT.FNPSubscribeSucceededNewRoot) {
+                // Success!
+                double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
+                if(rootLoc < 0.0 || rootLoc > 1.0) {
+                    Logger.error(this, "Invalid root loc: "+rootLoc+" from 
"+next);
+                    msg = DMT.createFNPUnsubscribe(id, counter);
+                    next.sendAsync(msg, null);
+                    continue;
+                }
+                // Validated success
+                handler.subscribeSucceeded(next, rootLoc);
+                setStatus(SUCCESS);
+                break;
+            }
+            
         } catch (DisconnectedException e) {
             Logger.minor(this, "Lost connection in restarting");
             // Lost him
-            handler.lostParentConnectionAlreadyRestarting();
+            handler.lostParentConnectionWasRestarting();
             setStatus(SEARCHING);
             return;
         }
@@ -156,18 +173,31 @@
          * We can get an FNPSubscribeRestarted, or
          * we can lose the connection.
          */
+       // REDFLAG: Do we want a way for a node to drop a subscription itself, 
apart from
+       // restarting then failing?
         MessageFilter mf = 
MessageFilter.create().setType(DMT.FNPSubscribeRestarted).setField(DMT.UID,id).setSource(next).setTimeout(120*1000);
+        MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setTimeout(120*1000).setField(DMT.UID, 
id).setType(DMT.FNPRejectedOverload);
+
         while(true) {
             try {
                 Message msg = node.usm.waitFor(mf);
-                if(msg != null) {
+                if(msg == null) {
+                       // Keep on waiting
+                       continue;
+                } else if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
+                       Logger.debug(this, "Restarting");
                     setStatus(RESTARTING);
                     return;
-                } // else continue
+                } else if(msg.getSpec() == DMT.FNPRejectedOverload) {
+                       Logger.debug(this, "Rejected: overload while in 
succeeded on "+this);
+                       setStatus(SEARCHING);
+                       return;
+                }
+                Logger.error(this, "WTF?: "+msg);
             } catch (DisconnectedException e) {
                 // Lost him
                 Logger.minor(this, "Lost connection in succeeded");
-                handler.lostParentConnectionAlreadyRestarting();
+                handler.lostParentConnectionWasSuccessful();
                 setStatus(SEARCHING);
                 return;
             }
@@ -216,19 +246,22 @@
             /* What can happen?
              * RejectedLoop can happen (ID collision)
              * RejectedOverload can happen, maybe
-             * RouteNotFound ???? probably not
+             * RouteNotFound - yes, if we are closer than any downstream
              * SubscribeSucceeded can definitely happen (~= accepted)
              * SubscribeRestarted can definitely happen (~= accepted)
              * SubscribeSucceededNewRoot ??? probably not
              * 
              * Anything else?
              */
+
+            // Terminal - don't need a counter (no further contact with the 
node)
+            MessageFilter mfRejectedLoop = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT);
+            MessageFilter mfRejectedOverload = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT);
+            MessageFilter mfRouteNotFound = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRouteNotFound).setTimeout(INITIAL_TIMEOUT);
             
-            MessageFilter mfRejectedLoop = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
-            MessageFilter mfRejectedOverload = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
-            // These two don't need a counter as they terminate our contact 
with the node
-            MessageFilter mfSubscribeSucceeded = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT);
-            MessageFilter mfSubscribeRestarted = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT);
+            // Non-terminal - DO need a counter
+            MessageFilter mfSubscribeSucceeded = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
+            MessageFilter mfSubscribeRestarted = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
             counter++;
             
             MessageFilter mf = 
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted)));
@@ -276,11 +309,24 @@
                 break;
             }
             
+            if(msg.getSpec() == DMT.FNPRouteNotFound) {
+               // We are closer than they are
+               // RNF does not carry a location because of false minima
+               // It does however carry an HTL
+               // But we ignore HTL > what we had before
+               short newHTL = msg.getShort(DMT.HTL);
+               Logger.debug(this, "RNF: "+msg+" my htl currently "+htl);
+               if(htl > newHTL) htl = newHTL;
+               // Try next node, if have any HTL left
+               continue;
+            }
+            
             Logger.error(this, "Unrecognized message: "+msg+" in phase 1");
         }
     }
 
     public void setStatus(int newStatus) {
+       Logger.debug(this, "Setting status to "+newStatus+" on "+this);
         status = newStatus;
         handler.statusChange(this, status);
     }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to