Author: toad
Date: 2005-10-21 13:19:36 +0000 (Fri, 21 Oct 2005)
New Revision: 7442

Added:
   branches/publish-subscribe/freenet/pubsub_notes/
   
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
Modified:
   branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch
   branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
   branches/publish-subscribe/freenet/src/freenet/node/Node.java
   branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
Log:
Last revision of pub/sub for a while, IMHO this is a blind alley, but it has 
been
useful; will feed into the eventual Working Pub/Sub Architecture, which will 
probably have a lot to do with passive requests and possibly something to do 
with
TUKs.

Modified: 
branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch
===================================================================
--- branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch 
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch 
2005-10-21 13:19:36 UTC (rev 7442)
@@ -5,7 +5,7 @@
 <booleanAttribute key="org.eclipse.jdt.launching.DEFAULT_CLASSPATH" 
value="true"/>
 <stringAttribute key="org.eclipse.ui.externaltools.ATTR_RUN_BUILD_KINDS" 
value="full,incremental,"/>
 <booleanAttribute key="org.eclipse.ant.ui.ATTR_TARGETS_UPDATED" value="true"/>
-<stringAttribute key="org.eclipse.ui.externaltools.ATTR_LOCATION" 
value="${workspace_loc:/Freenet 0.7/build.xml}"/>
+<stringAttribute key="org.eclipse.ui.externaltools.ATTR_LOCATION" 
value="${workspace_loc:/Freenet 0.7 publish subscribe branch/build.xml}"/>
 <booleanAttribute key="org.eclipse.ui.externaltools.ATTR_TRIGGERS_CONFIGURED" 
value="true"/>
 <booleanAttribute key="org.eclipse.debug.core.appendEnvironmentVariables" 
value="true"/>
 <stringAttribute key="org.eclipse.jdt.launching.CLASSPATH_PROVIDER" 
value="org.eclipse.ant.ui.AntClasspathProvider"/>

Modified: branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java     
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java     
2005-10-21 13:19:36 UTC (rev 7442)
@@ -85,6 +85,7 @@
     public static final String MUST_BEAT_LOCATION = "mustBeatLocation";
     public static final String ROOT_LOCATION = "rootLocation";
     public static final String ORDERED_MESSAGE_NUM = "orderedMessageNum";
+    public static final String NEW_UID = "newUID";
 
        //Diagnostic
        public static final MessageType ping = new MessageType("ping") {{
@@ -781,6 +782,11 @@
         msg.set(HTL, htl);
         return msg;
     }
+
+    public static final MessageType FNPCoalesceNotify = new 
MessageType("FNPCoalesceNotify") {{
+       addField(UID, Long.class);
+       addField(NEW_UID, Long.class);
+    }};
     
     public static final MessageType FNPPublishData = new 
MessageType("FNPPublishData") {{
         addField(UID, Long.class);
@@ -872,26 +878,6 @@
         return msg;
     }
     
-    public static final MessageType FNPResubscribeRequest = new 
MessageType("FNPResubscribeRequest") {{
-        addField(RESTART_UID, Long.class);
-        addField(HTL, Short.class);
-        addField(KEY, PublishStreamKey.class);
-        addField(LAST_SEQNO, Long.class);
-        addField(NEAREST_LOCATION, Double.class);
-        addField(MUST_BEAT_LOCATION, Double.class);
-    }};
-    
-    public static final Message createFNPResubscribeRequest(long restartUID, 
short htl, PublishStreamKey key, long lastSeqNum, double nearestSoFar, double 
mustBeatLocation) {
-        Message msg = new Message(FNPResubscribeRequest);
-        msg.set(RESTART_UID, restartUID);
-        msg.set(HTL, htl);
-        msg.set(KEY, key);
-        msg.set(LAST_SEQNO, lastSeqNum);
-        msg.set(NEAREST_LOCATION, nearestSoFar);
-        msg.set(MUST_BEAT_LOCATION, mustBeatLocation);
-        return msg;
-    }
-
     public static final MessageType FNPSubscribeRestarted = new 
MessageType("FNPSubscribeRestarted") {{
         addField(UID, Long.class); // if it is a reply to a SubscribeRequest, 
we need to be able to identify it,
         // if only because of possible race conditions.
@@ -904,7 +890,6 @@
         Message msg = new Message(FNPSubscribeRestarted);
         msg.set(UID, uid);
         msg.set(RESTART_UID, restartUID);
-        msg.set(MUST_BEAT_LOCATION, orderedMessageNum);
         return msg;
     }
     
@@ -922,18 +907,6 @@
         return msg;
     }
 
-    public static final MessageType FNPSubscribeSucceededNewRoot = new 
MessageType("FNPSubscribeSucceededNewRoot") {{
-        addField(ORDERED_MESSAGE_NUM, Integer.class);
-        addField(UID, Long.class);
-    }};
-    
-    public static final Message createFNPSubscribeSucceededNewRoot(long uid, 
PublishStreamKey key, int orderedMessageNum) {
-        Message msg = new Message(FNPSubscribeSucceeded);
-        msg.set(UID, uid);
-        msg.set(ORDERED_MESSAGE_NUM, orderedMessageNum);
-        return msg;
-    }
-
     public static final MessageType FNPUnsubscribe = new 
MessageType("FNPUnsubscribe") {{
        addField(UID, Long.class);
     }};

Modified: branches/publish-subscribe/freenet/src/freenet/node/Node.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/Node.java       
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/Node.java       
2005-10-21 13:19:36 UTC (rev 7442)
@@ -761,4 +761,9 @@
     public ClientSubscription subscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb) {
         return subscriptions.localSubscribe(key, cb);
     }
+
+    /** Reject any requests made with the given ID. */
+       public synchronized void blockID(long newID) {
+               recentlyCompletedIDs.push(new Long(newID));
+       }
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java     
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java     
2005-10-21 13:19:36 UTC (rev 7442)
@@ -76,8 +76,6 @@
             return handleInsertRequest(m);
         } else if(spec == DMT.FNPSubscribeRequest) {
             return node.subscriptions.handleSubscribeRequest(m);
-        } else if(spec == DMT.FNPResubscribeRequest) {
-            return node.subscriptions.handleResubscribeRequest(m);
         } else if(spec == DMT.FNPPublishData) {
             return node.subscriptions.handlePublishData(m);
         }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java   
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java   
2005-10-21 13:19:36 UTC (rev 7442)
@@ -79,6 +79,14 @@
                 Logger.error(this, "Could not send RNF to "+source);
             }
         } else {
+               // Accept
+               Message accept = DMT.createFNPAccepted(id);
+               try {
+                               subscriber.sendAsync(accept, null);
+                       } catch (NotConnectedException e1) {
+                               Logger.minor(this, "Not connected while sending 
Accepted");
+                               return;
+                       }
             // Add self to parent
             sub.addSubscriberHandler(this);
             // FIXME: lock safety? Should do this off-thread?
@@ -105,7 +113,7 @@
         if(subscribed)
             msg = DMT.createFNPSubscribeSucceeded(origID, counter++, rootLoc);
         else
-            msg = DMT.createFNPSubscribeRestarted(origID, sub.getRestartUID(), 
counter++);
+            msg = DMT.createFNPSubscribeRestarted(origID, 
sub.getSubscribingUID(), counter++);
         try {
             subscriber.sendAsync(msg, null);
             return true;

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-21 13:19:36 UTC (rev 7442)
@@ -10,7 +10,7 @@
 import freenet.support.Logger;
 
 /**
- * SubscriptionRequest sender. Analogous to RequestSender.
+ * SubscribeRequest sender. Analogous to RequestSender.
  */
 public class SubscribeSender implements Runnable {
 
@@ -19,7 +19,7 @@
     final double target;
     private short htl;
     final PeerNode source;
-    final SubscriptionHandler handler;
+    final SubscribeSenderCallback handler;
     final SubscribeHandler origSubscriber;
     final double nearestSoFar;
     private final HashSet nodesRoutedTo;
@@ -36,10 +36,11 @@
     static final int REJECTED_OVERLOAD = 2;
     /** An internal error occurred */
     static final int INTERNAL_ERROR = 3;
-    /** A node further up the tree is restarting its subscription */
+    /** A node further up the tree is restarting its subscription. We may be 
called upon to be part of its search. */
     static final int RESTARTING = 4;
 
     static final int INITIAL_TIMEOUT = 5000;
+    static final int SEARCH_TIMEOUT = 120*1000;
     static final int RESTART_TIMEOUT = 120*1000;
     
     private int status;
@@ -60,14 +61,15 @@
      * @param sub The SubscribeHandler which originated this SubscribeSender, 
if any.
      * Null if this is a locally originated subscription.
      */
-    public SubscribeSender(long id, short htl, PeerNode source, double 
nearestSoFar, SubscriptionHandler handler, SubscribeHandler sub) {
+    public SubscribeSender(long id, short htl, PeerNode source, double 
nearestSoFar, 
+               SubscribeSenderCallback handler, SubscribeHandler sub) {
         this.id = id;
         this.htl = htl;
         this.source = source;
         this.handler = handler;
         this.nearestSoFar = nearestSoFar;
         target = handler.targetLocation();
-        node = handler.node;
+        node = handler.getNode();
         status = SEARCHING;
         nodesRoutedTo = new HashSet();
         origSubscriber = sub;
@@ -117,16 +119,14 @@
          * Error: RejectedOverload, RouteNotFound
          */
         MessageFilter mfSubscribeSucceeded = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setField(DMT.ORDERED_MESSAGE_NUM, 
counter).setType(DMT.FNPSubscribeSucceeded);
-        MessageFilter mfSubscribeSucceededNewRoot = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setField(DMT.ORDERED_MESSAGE_NUM, 
counter).setType(DMT.FNPSubscribeSucceededNewRoot);
         MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setType(DMT.FNPRejectedOverload);
         MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setType(DMT.FNPRouteNotFound);
         
-        MessageFilter mf = 
mfSubscribeSucceeded.or(mfSubscribeSucceededNewRoot.or(mfRejectedOverload.or(mfRouteNotFound)));
-        counter++;
+        MessageFilter mf = 
mfSubscribeSucceeded.or(mfRejectedOverload.or(mfRouteNotFound));
         
         try {
             Message msg = node.usm.waitFor(mf);
-            
+
             if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
                 // :(
                 // Timeout or rejected:overload
@@ -135,7 +135,7 @@
                 setStatus(REJECTED_OVERLOAD);
                 return;
             }
-            
+
             if(msg.getSpec() == DMT.FNPRouteNotFound) {
                 // It didn't find anywhere closer than us or someone behind us 
:(
                Logger.minor(this, msg.toString()+" in runPhase2Restarting");
@@ -144,7 +144,8 @@
                return;
             }
             
-            if(msg.getSpec() == DMT.FNPSubscribeSucceeded || msg.getSpec() == 
DMT.FNPSubscribeSucceededNewRoot) {
+            if(msg.getSpec() == DMT.FNPSubscribeSucceeded) {
+               counter++;
                 // Success!
                 double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
                 if(rootLoc < 0.0 || rootLoc > 1.0) {
@@ -163,7 +164,7 @@
                 setStatus(SUCCESS);
                 return;
             }
-            
+
         } catch (DisconnectedException e) {
             Logger.minor(this, "Lost connection in restarting");
             // Lost him
@@ -188,6 +189,7 @@
        // restarting then failing?
         MessageFilter mf = 
MessageFilter.create().setType(DMT.FNPSubscribeRestarted).setField(DMT.UID,id).setSource(next).setTimeout(120*1000);
         MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setTimeout(120*1000).setField(DMT.UID, 
id).setType(DMT.FNPRejectedOverload);
+        mf = mf.or(mfRejectedOverload);
 
         while(true) {
             try {
@@ -219,7 +221,8 @@
     }
 
     /**
-     * Find a node to subscribe to.
+     * Find a node to subscribe to. Wait until we are failed, RESTARTING or
+     * SUCCEEDED.
      */
     private void runPhase1() {
         while(true) {
@@ -247,8 +250,8 @@
             nodesRoutedTo.add(next);
             
             // Create message, including new counter
-            counter = node.random.nextInt();
-            Message msg = DMT.createFNPSubscribeRequest(id, htl, handler.key, 
handler.getLastSeqNum(), nearestSoFar, counter);            
+            counter = node.random.nextInt(); // new counter for this node
+            Message msg = DMT.createFNPSubscribeRequest(id, htl, 
handler.getKey(), handler.getLastSeqNum(), nearestSoFar, counter);            
             
             try {
                 // Send it
@@ -257,42 +260,26 @@
                 Logger.normal(this, "Disconnected from "+next);
                 continue;
             }
+
+            // Wait for Accepted
             
-            // Wait for success/failure/etc
-            
-            /* What can happen?
-             * RejectedLoop can happen (ID collision)
-             * RejectedOverload can happen, maybe
-             * RouteNotFound - yes, if we are closer than any downstream
-             * SubscribeSucceeded can definitely happen (~= accepted)
-             * SubscribeRestarted can definitely happen (~= accepted)
-             * SubscribeSucceededNewRoot ??? probably not
-             * 
-             * Anything else?
-             */
-
-            // Terminal - don't need a counter (no further contact with the 
node)
+            MessageFilter mfAccepted = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPAccepted).setTimeout(INITIAL_TIMEOUT);
             MessageFilter mfRejectedLoop = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT);
             MessageFilter mfRejectedOverload = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT);
-            MessageFilter mfRouteNotFound = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRouteNotFound).setTimeout(INITIAL_TIMEOUT);
+
+            MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
             
-            // Non-terminal - DO need a counter
-            MessageFilter mfSubscribeSucceeded = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
-            MessageFilter mfSubscribeRestarted = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
-            counter++;
-            
-            MessageFilter mf = 
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted.or(mfRouteNotFound))));
-            
+            msg = null;
             try {
-                msg = node.usm.waitFor(mf);
-            } catch (DisconnectedException e1) {
-                Logger.normal(this, "Disconnected: "+next+" while waiting");
+                               msg = node.usm.waitFor(mf);
+                       } catch (DisconnectedException e2) {
+                Logger.normal(this, "Disconnected: "+next+" while waiting for 
accepted in "+this);
                 htl = node.decrementHTL(source, htl);
                 continue;
-            }
+                       }
             
             if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
-                // Timeout or rejected:overload
+                // Timeout or rejected:overload; fatal error
                 Logger.error(this, "Timeout or rejected:overload: "+msg+" for 
"+id+" to "+next);
                 handler.senderRejectedOverload();
                 setStatus(REJECTED_OVERLOAD);
@@ -304,46 +291,145 @@
                 htl = node.decrementHTL(source, htl);
                 continue;
             }
-            
-            if(msg.getSpec() == DMT.FNPSubscribeSucceeded) {
-                // Success!
-                double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
-                if(rootLoc < 0.0 || rootLoc > 1.0) {
-                    Logger.error(this, "Invalid root loc: "+rootLoc+" from 
"+next);
-                    msg = DMT.createFNPUnsubscribe(id, counter);
-                    try {
-                                               next.sendAsync(msg, null);
-                                       } catch (NotConnectedException e) {
-                                               Logger.error(this, 
"NotConnectedException sending "+msg+" to "+next);
-                                       }
-                    continue;
-                }
-                // Validated success
-                handler.subscribeSucceeded(next, rootLoc, 
(short)(origSubscriber.origHTL - htl));
-                setStatus(SUCCESS);
-                break;
+
+            if(msg.getSpec() != DMT.FNPAccepted) {
+               Logger.error(this, "Unexpected message waiting for Accepted in 
"+this+": "+msg);
+               continue;
             }
             
-            if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
-                long restartID = msg.getLong(DMT.RESTART_UID);
-                handler.subscribeRestarted(next, restartID);
-                setStatus(RESTARTING);
-                break;
-            }
+            // Accepted
             
-            if(msg.getSpec() == DMT.FNPRouteNotFound) {
-               // We are closer than they are
-               // RNF does not carry a location because of false minima
-               // It does however carry an HTL
-               // But we ignore HTL > what we had before
-               short newHTL = msg.getShort(DMT.HTL);
-               Logger.debug(this, "RNF: "+msg+" my htl currently "+htl);
-               if(htl > newHTL) htl = newHTL;
-               // Try next node, if have any HTL left
-               continue;
-            }
-            
-            Logger.error(this, "Unrecognized message: "+msg+" in phase 1");
+            while (true) {
+
+                               // Wait for success, restart, failure
+
+                               /* 120 second timeout.
+                                * What can happen?
+                                * 
+                                * RejectedOverload (or timeout) - overload 
(fatal)
+                                * RouteNotFound - can't find it, and can't 
become root
+                                * SubscribeSucceeded - go to SUCCEEDED
+                                * SubscribeRestarted - go to RESTARTING
+                                * CoalesceNotify - make sure requests with 
given ID are rejected
+                                */
+
+                               // Terminal - does not need a counter (no 
further contact with the node)
+                               MessageFilter mfRouteNotFound = 
MessageFilter.create()
+                                               .setField(DMT.UID, 
id).setSource(next).setType(
+                                                               
DMT.FNPRouteNotFound)
+                                               .setTimeout(SEARCH_TIMEOUT);
+                               mfRejectedOverload.setTimeout(SEARCH_TIMEOUT);
+
+                               // Non-terminal - DO need a counter
+                               MessageFilter mfSubscribeSucceeded = 
MessageFilter.create()
+                                               .setField(DMT.UID, 
id).setSource(next).setType(
+                                                               
DMT.FNPSubscribeSucceeded).setTimeout(
+                                                               
SEARCH_TIMEOUT).setField(
+                                                               
DMT.ORDERED_MESSAGE_NUM, counter);
+                               MessageFilter mfSubscribeRestarted = 
MessageFilter.create()
+                                               .setField(DMT.UID, 
id).setSource(next).setType(
+                                                               
DMT.FNPSubscribeRestarted).setTimeout(
+                                                               
SEARCH_TIMEOUT).setField(
+                                                               
DMT.ORDERED_MESSAGE_NUM, counter);
+
+                               // Misc
+                               MessageFilter mfCoalesceNotify = 
MessageFilter.create()
+                                               .setField(DMT.UID, 
id).setSource(next).setType(
+                                                               
DMT.FNPCoalesceNotify).setTimeout(
+                                                               SEARCH_TIMEOUT);
+
+                               mf = mfRejectedOverload.or(mfSubscribeSucceeded
+                                               
.or(mfSubscribeRestarted.or(mfRouteNotFound
+                                                               
.or(mfCoalesceNotify))));
+
+                               try {
+                                       msg = node.usm.waitFor(mf);
+                               } catch (DisconnectedException e1) {
+                                       Logger.normal(this, "Disconnected: " + 
next
+                                                       + " while waiting in " 
+ this);
+                                       htl = node.decrementHTL(source, htl);
+                                       break; // try next node
+                               }
+
+                               if (msg == null || msg.getSpec() == 
DMT.FNPRejectedOverload) {
+                                       // Timeout or rejected:overload
+                                       Logger.error(this, "Timeout or 
rejected:overload: " + msg
+                                                       + " for " + id + " to " 
+ next);
+                                       handler.senderRejectedOverload();
+                                       setStatus(REJECTED_OVERLOAD);
+                                       return;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPSubscribeSucceeded) 
{
+                                       counter++;
+                                       // Success! Go to SUCCEEDED
+                                       double rootLoc = 
msg.getDouble(DMT.ROOT_LOCATION);
+                                       if (rootLoc < 0.0 || rootLoc > 1.0) {
+                                               Logger.error(this, "Invalid 
root loc: " + rootLoc
+                                                               + " from " + 
next);
+                                               msg = 
DMT.createFNPUnsubscribe(id, counter);
+                                               try {
+                                                       next.sendAsync(msg, 
null);
+                                               } catch (NotConnectedException 
e) {
+                                                       Logger.error(this, 
"NotConnectedException sending "
+                                                                       + msg + 
" to " + next);
+                                               }
+                                               break; // try next node
+                                       }
+                                       // Validated success
+                                       if (!handler.subscribeSucceeded(next, 
rootLoc,
+                                                       (short) 
(origSubscriber.origHTL - htl))) {
+                                               Logger.error(this,
+                                                               "Rejected root 
in successful subscription: "
+                                                                               
+ rootLoc + " from " + next);
+                                               Message m = 
DMT.createFNPUnsubscribe(id, counter);
+                                               try {
+                                                       next.sendAsync(m, null);
+                                               } catch (NotConnectedException 
e) {
+                                                       // Unusual, but not 
particularly interesting
+                                                       Logger.minor(this,
+                                                                       "Lost 
connection rejecting successful subscription to "
+                                                                               
        + next + " on " + this);
+                                               }
+                                       }
+                                       setStatus(SUCCESS);
+                                       return;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPSubscribeRestarted) 
{
+                                       counter++;
+                                       long restartID = 
msg.getLong(DMT.RESTART_UID);
+                                       handler.subscribeRestarted(next, 
restartID);
+                                       setStatus(RESTARTING);
+                                       return;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPRouteNotFound) {
+                                       // We are closer than they are
+                                       // RNF does not carry a location 
because of false minima
+                                       // It does however carry an HTL
+                                       // But we ignore HTL > what we had 
before
+                                       short newHTL = msg.getShort(DMT.HTL);
+                                       Logger.debug(this, "RNF: " + msg + " my 
htl currently "
+                                                       + htl);
+                                       if (htl > newHTL)
+                                               htl = newHTL;
+                                       // Try next node, if have any HTL left
+                                       break;
+                               }
+
+                               if (msg.getSpec() == DMT.FNPCoalesceNotify) {
+                                       long newID = msg.getLong(DMT.NEW_UID);
+                                       node.blockID(newID);
+                                       // FIXME: what if it has already 
arrived? it will time out... so what?
+                                       Logger.minor(this, "Received coalesce 
in " + this + " : "
+                                                       + id + " -> " + newID);
+                                       continue; // more messages to come
+                               }
+
+                               Logger.error(this, "Unrecognized message: " + 
msg
+                                               + " in phase 1");
+                       }
         }
     }
 

Added: 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
    2005-10-21 13:17:13 UTC (rev 7441)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
    2005-10-21 13:19:36 UTC (rev 7442)
@@ -0,0 +1,51 @@
+package freenet.node;
+
+import freenet.keys.PublishStreamKey;
+
+/**
+ * Interface for SubscribeSender to call. Implemented by SubscriptionHandler,
+ * and others.
+ */
+interface SubscribeSenderCallback {
+
+       /** The key, as a double. */
+       double targetLocation();
+
+       /** The Node we are part of. */
+       Node getNode();
+
+       /** The restart UID on the SubscribeRestarted message recently received 
*/
+       long getUpstreamRestartUID();
+
+       /** The node we routed to rejected the request due to overload. */
+       void senderRejectedOverload();
+
+       /** The node we routed to RNF'ed while it was RESTARTING. */
+       void senderRestartingSearchingRNF();
+
+       /** Got an invalid SubscribeSucceeded message while RESTARTING! */
+       void invalidSuccessInRestarting();
+
+       /** Success! @return True unless we reject the root location. */
+       boolean subscribeSucceeded(PeerNode next, double rootLoc, short 
htlDiff);
+
+       /** Lost the connection to the parent node, while RESTARTING. */
+       void lostParentConnectionWasRestarting();
+
+       /** Got a SubscribeRestarted */
+       void subscribeRestarted(PeerNode next, long restartID);
+
+       /** Lost the connection to the parent node, while subscribed. */
+       void lostParentConnectionWasSuccessful();
+
+       /** Could not find the stream. */
+       void senderRNF();
+
+       /** Get the key being searched for */
+       PublishStreamKey getKey();
+
+       /** Get the seqno of the last packet we saw, or rather the last one that
+        * we don't want. */
+       long getLastSeqNum();
+
+}

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-21 13:17:13 UTC (rev 7441)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-21 13:19:36 UTC (rev 7442)
@@ -21,7 +21,7 @@
  * May have a parent node, or may be the root.
  * May have many child nodes.
  */
-public class SubscriptionHandler {
+public class SubscriptionHandler implements SubscribeSenderCallback {
     
     static final int KEEP_PACKETS = 32;
     
@@ -563,7 +563,7 @@
                                Logger.minor(this, "Rejected because orig 
closer than root, but conn closed on "+id+" for "+this);
                        }
                        // There *may* be a more optimal route somewhere...
-                       maybeRestart();
+                       maybeResubscribe();
         }
         
         try {
@@ -601,6 +601,7 @@
         synchronized(this) {
             if(subscribed) return;
             if(sender != null) return;
+            subscribingUID = id;
             sender = new SubscribeSender(id, htl, source, nearest, this, sub);
         }
         Thread t = new Thread(sender);
@@ -711,7 +712,7 @@
                     subscribing = true;
                 }
             }
-            startSubscribe(random.nextLong(), Node.MAX_HTL, 
node.lm.getLocation().getValue(), null, null);
+            startSubscribe(subscribingUID, Node.MAX_HTL, 
node.lm.getLocation().getValue(), null, null);
         } finally {
             if(restartingNow && sender == null)
                 subscribing = false;
@@ -811,7 +812,7 @@
         * attempt with the next SubscribeHandler.
         * @throws DroppingSubscriptionHandlerException 
         */
-       void senderRNF() {
+       public void senderRNF() {
                Logger.minor(this, "Sender could not find stream on "+this);
                boolean becomeRoot = false;
                short htlDiff = 0;
@@ -891,7 +892,7 @@
         * Forward it to the original requestor, and run another one if 
possible.
         * If this was a local request, tell all clients they are disconnected.
         */
-       void senderRejectedOverload() {
+       public void senderRejectedOverload() {
                ClientSubscriptionHandler[] clients;
                synchronized(this) {
                        SubscribeHandler origSub = sender.origSubscriber;
@@ -917,7 +918,7 @@
        }
 
        /** Upstream restarted */
-       void subscribeRestarted(PeerNode next, long restartID) {
+       public void subscribeRestarted(PeerNode next, long restartID) {
                SubscribeHandler[] subscribers;
                synchronized(this) {
                        parent = next;
@@ -945,17 +946,17 @@
         * Got an RNF while upstream was restarting.
         * Just send our old restart ID out.
         */
-       void senderRestartingSearchingRNF() {
+       public void senderRestartingSearchingRNF() {
                upstreamRestartFailed();
        }
        
        /** Lost connection to node which was restarting. */
-       void lostParentConnectionWasRestarting() {
+       public void lostParentConnectionWasRestarting() {
                upstreamRestartFailed();
        }
 
        /** Restarting would-be parent sent an invalid success message */
-       void invalidSuccessInRestarting() {
+       public void invalidSuccessInRestarting() {
                upstreamRestartFailed();
        }
 
@@ -980,4 +981,12 @@
        public long getUpstreamRestartUID() {
                return upstreamRestartUID;
        }
+
+       public Node getNode() {
+               return node;
+       }
+
+       public PublishStreamKey getKey() {
+               return key;
+       }
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java    
    2005-10-21 13:17:13 UTC (rev 7441)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java    
    2005-10-21 13:19:36 UTC (rev 7442)
@@ -155,8 +155,7 @@
 
     /**
      * Handle a SubscribeRequest.
-     * @return True unless we want the message to be put back
-     * onto the queue.
+     * @return True unless we want the message to be put back onto the queue.
      */
     public boolean handleSubscribeRequest(Message m) {
         // Do we have the stream in question, firstly?

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to