Author: toad
Date: 2005-10-12 19:50:08 +0000 (Wed, 12 Oct 2005)
New Revision: 7425

Modified:
   branches/publish-subscribe/freenet/.classpath
   branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
   
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
   branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java
   branches/publish-subscribe/freenet/src/freenet/node/Node.java
   branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
   branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java
   branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java
   branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
   
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java
Log:
Current status of publish/subscribe implementation, on a branch. Does not build!
The main freenet 0.7 repository is trunk/freenet, which does build.

Modified: branches/publish-subscribe/freenet/.classpath
===================================================================
--- branches/publish-subscribe/freenet/.classpath       2005-10-12 18:56:58 UTC 
(rev 7424)
+++ branches/publish-subscribe/freenet/.classpath       2005-10-12 19:50:08 UTC 
(rev 7425)
@@ -1,6 +1,9 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
-       <classpathentry 
excluding="test/**|org/spaceroots/mantissa/random/MersenneTwisterTest.java" 
kind="src" path="src"/>
+       <classpathentry 
excluding="org/spaceroots/mantissa/random/MersenneTwisterTest.java|test/*" 
kind="src" path="src"/>
        <classpathentry kind="con" 
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+       <classpathentry combineaccessrules="false" kind="src" path="/Contrib"/>
+       <classpathentry kind="lib" path="/usr/src/cvs/junit3.8.1.jar"/>
+       <classpathentry kind="lib" 
path="/usr/src/cvs/freenet-stable/lib/freenet-ext.jar"/>
        <classpathentry kind="output" path="bin"/>
 </classpath>

Modified: branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java     
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java     
2005-10-12 19:50:08 UTC (rev 7425)
@@ -79,6 +79,12 @@
     public static final String BLOCK_HEADERS = "blockHeaders";
     public static final String DATA_INSERT_REJECTED_REASON = 
"dataInsertRejectedReason";
     public static final String STREAM_SEQNO = "streamSequenceNumber";
+    public static final String LAST_SEQNO = "lastSequenceNumber";
+    public static final String SUGGESTED_SEQNO = "suggestedSequenceNumber";
+    public static final String RESTART_UID = "restartUID";
+    public static final String MUST_BEAT_LOCATION = "mustBeatLocation";
+    public static final String ROOT_LOCATION = "rootLocation";
+    public static final String ORDERED_MESSAGE_NUM = "orderedMessageNum";

        //Diagnostic
        public static final MessageType ping = new MessageType("ping") {{
@@ -778,21 +784,17 @@

     public static final MessageType FNPPublishData = new 
MessageType("FNPPublishData") {{
         addField(UID, Long.class);
-        addField(HTL, Short.class);
         addField(DATA, ShortBuffer.class);
         addField(KEY, PublishStreamKey.class);
         addField(STREAM_SEQNO, Long.class);
-        addField(NEAREST_LOCATION, Double.class);
     }};

-    public static final Message createFNPPublishData(short htl, byte[] data, 
PublishStreamKey key, long seqNo, long id, double nearestLoc) {
+    public static final Message createFNPPublishData(byte[] data, 
PublishStreamKey key, long seqNo, long id) {
         Message msg = new Message(FNPPublishData);
-        msg.set(HTL, htl);
         msg.set(KEY, key);
         msg.set(DATA, new ShortBuffer(data));
         msg.set(STREAM_SEQNO, seqNo);
         msg.set(UID, id);
-        msg.set(NEAREST_LOCATION, nearestLoc);
         return msg;
     }

@@ -830,6 +832,120 @@
         return msg;
     }

+    public static final MessageType FNPPublishDataRejectedRestarting = new 
MessageType("FNPPublishDataRejectedRestarting") {{
+        addField(UID, Long.class);
+    }};
+    
+    public static final Message createFNPPublishDataRejectedRestarting(long 
id) {
+        Message msg = new Message(FNPPublishDataRejectedRestarting);
+        msg.set(UID, id);
+        return msg;
+    }
+    
+    public static final MessageType FNPPublishDataNotSubscribed = new 
MessageType("FNPPublishDataNotSubscribed") {{
+        addField(UID, Long.class);
+    }};
+    
+    public static final Message createFNPPublishDataNotSubscribed(long uid) {
+        Message msg = new Message(FNPPublishDataNotSubscribed);
+        msg.set(UID, uid);
+        return msg;
+    }
+    
+    public static final MessageType FNPSubscribeRequest = new 
MessageType("FNPSubscribeRequest") {{
+        addField(UID, Long.class);
+        addField(HTL, Short.class);
+        addField(KEY, PublishStreamKey.class);
+        addField(LAST_SEQNO, Long.class);
+        addField(NEAREST_LOCATION, Double.class);
+        addField(ORDERED_MESSAGE_NUM, Integer.class);
+    }};
+    
+    public static final Message createFNPSubscribeRequest(long uid, short htl, 
PublishStreamKey key, long lastSeqNum, double nearestSoFar, int orderedSeqNo) {
+        Message msg = new Message(FNPSubscribeRequest);
+        msg.set(UID, uid);
+        msg.set(HTL, htl);
+        msg.set(KEY, key);
+        msg.set(LAST_SEQNO, lastSeqNum);
+        msg.set(NEAREST_LOCATION, nearestSoFar);
+        msg.set(ORDERED_MESSAGE_NUM, orderedSeqNo);
+        return msg;
+    }
+    
+    public static final MessageType FNPResubscribeRequest = new 
MessageType("FNPResubscribeRequest") {{
+        addField(RESTART_UID, Long.class);
+        addField(HTL, Short.class);
+        addField(KEY, PublishStreamKey.class);
+        addField(LAST_SEQNO, Long.class);
+        addField(NEAREST_LOCATION, Double.class);
+        addField(MUST_BEAT_LOCATION, Double.class);
+    }};
+    
+    public static final Message createFNPResubscribeRequest(long restartUID, 
short htl, PublishStreamKey key, long lastSeqNum, double nearestSoFar, double 
mustBeatLocation) {
+        Message msg = new Message(FNPResubscribeRequest);
+        msg.set(RESTART_UID, restartUID);
+        msg.set(HTL, htl);
+        msg.set(KEY, key);
+        msg.set(LAST_SEQNO, lastSeqNum);
+        msg.set(NEAREST_LOCATION, nearestSoFar);
+        msg.set(MUST_BEAT_LOCATION, mustBeatLocation);
+        return msg;
+    }
+
+    public static final MessageType FNPSubscribeRestarted = new 
MessageType("FNPSubscribeRestarted") {{
+        addField(UID, Long.class); // if it is a reply to a SubscribeRequest, 
we need to be able to identify it,
+        // if only because of possible race conditions.
+        
+        addField(RESTART_UID, Long.class);
+        addField(ORDERED_MESSAGE_NUM, Integer.class);
+    }};
+    
+    public static final Message createFNPSubscribeRestarted(long uid, long 
restartUID, int orderedMessageNum) {
+        Message msg = new Message(FNPSubscribeRestarted);
+        msg.set(UID, uid);
+        msg.set(RESTART_UID, restartUID);
+        msg.set(MUST_BEAT_LOCATION, orderedMessageNum);
+        return msg;
+    }
+    
+    public static final MessageType FNPSubscribeSucceeded = new 
MessageType("FNPSubscribeSucceeded") {{
+        addField(ORDERED_MESSAGE_NUM, Integer.class);
+        addField(UID, Long.class);
+        addField(ROOT_LOCATION, Double.class);
+    }};
+
+    public static final Message createFNPSubscribeSucceeded(long uid, int 
orderedMessageNum, double rootLoc) {
+        Message msg = new Message(FNPSubscribeSucceeded);
+        msg.set(UID, uid);
+        msg.set(ORDERED_MESSAGE_NUM, orderedMessageNum);
+        msg.set(ROOT_LOCATION, rootLoc);
+        return msg;
+    }
+
+    public static final MessageType FNPSubscribeSucceededNewRoot = new 
MessageType("FNPSubscribeSucceededNewRoot") {{
+        addField(ORDERED_MESSAGE_NUM, Integer.class);
+        addField(UID, Long.class);
+    }};
+    
+    public static final Message createFNPSubscribeSucceededNewRoot(long uid, 
PublishStreamKey key, int orderedMessageNum) {
+        Message msg = new Message(FNPSubscribeSucceeded);
+        msg.set(UID, uid);
+        msg.set(ORDERED_MESSAGE_NUM, orderedMessageNum);
+        return msg;
+    }
+
+    public static final MessageType FNPPublishDataCollision = new 
MessageType("FNPPublishDataCollision") {{
+        addField(UID, Long.class);
+        addField(SUGGESTED_SEQNO, Long.class);
+    }};
+    
+    public static final Message createFNPPublishDataCollision(long id, long 
seqNo) {
+        Message msg = new Message(FNPPublishDataCollision);
+        msg.set(UID, id);
+        msg.set(SUGGESTED_SEQNO, seqNo);
+        return msg;
+    }
+    
 //    public static final MessageType FNPSubscribeRequest = new 
MessageType("FNPSubscribeRequest") {{
 //        addField(UID, Long.class);
 //        addField(HTL, Short.class);

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
  2005-10-12 18:56:58 UTC (rev 7424)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
  2005-10-12 19:50:08 UTC (rev 7425)
@@ -14,13 +14,13 @@


     final ClientPublishStreamKey key;
-    final SubscriptionManager sm;
+    final SubscriptionHandler handler;
     boolean finished = false;
     private final LinkedList clientSubs;

-    ClientSubscriptionHandler(SubscriptionManager manager, 
ClientPublishStreamKey key) {
+    ClientSubscriptionHandler(SubscriptionHandler sh, ClientPublishStreamKey 
key) {
         clientSubs = new LinkedList();
-        sm = manager;
+        handler = sh;
         this.key = key;
     }

@@ -34,7 +34,7 @@
         clientSubs.remove(sub);
         if(clientSubs.size() == 0) {
             finished = true;
-            sm.remove(this);
+            handler.remove(this);
         }
     }


Modified: branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java       
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java       
2005-10-12 19:50:08 UTC (rev 7425)
@@ -27,6 +27,15 @@
         this.prb = prb;
         this.fromStore = fromStore;
         this.closestLocation = closestLocation;
+        
+        double myLoc = node.lm.getLocation().getValue();
+        if(Math.abs(target-myLoc) < Math.abs(target-closestLocation)) {
+            closestLocation = myLoc;
+            htl = Node.MAX_HTL;
+        } else {
+            htl = node.decrementHTL(source, htl);
+        }
+        
         Thread t = new Thread(this, "InsertSender for UID "+uid+" on 
"+node.portNumber);
         t.setDaemon(true);
         t.start();
@@ -76,13 +85,8 @@
             // Route it
             PeerNode next;
             // Can backtrack, so only route to nodes closer than we are to 
target.
-            double nextValue;
             synchronized(node.peers) {
                 next = node.peers.closerPeer(source, nodesRoutedTo, target, 
true);
-                if(next != null)
-                    nextValue = next.getLocation().getValue();
-                else
-                    nextValue = -1.0;
             }

             if(next == null) {
@@ -93,11 +97,6 @@
             Logger.minor(this, "Routing insert to "+next);
             nodesRoutedTo.add(next);

-            if(Math.abs(target - nextValue) > Math.abs(target - 
closestLocation)) {
-                Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
-                htl = node.decrementHTL(source, htl);
-            }
-            
             Message req = DMT.createFNPInsertRequest(uid, htl, myKey, 
closestLocation);

             // Wait for ack or reject... will come before even a locally 
generated DataReply
@@ -132,6 +131,7 @@
             }

             if(msg.getSpec() == DMT.FNPRejectedLoop) {
+                htl = node.decrementHTL(source, htl);
                 // Loop - we don't want to send the data to this one
                 continue;
             }
@@ -208,6 +208,7 @@
             if(msg.getSpec() == DMT.FNPRouteNotFound) {
                 Logger.minor(this, "Rejected: RNF");
                 // Still gets the data - but not yet
+                htl = node.decrementHTL(source, htl);
                 short newHtl = msg.getShort(DMT.HTL);
                 if(htl > newHtl) htl = newHtl;
                 continue;

Modified: branches/publish-subscribe/freenet/src/freenet/node/Node.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/Node.java       
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/Node.java       
2005-10-12 19:50:08 UTC (rev 7425)
@@ -57,8 +57,8 @@

     public static final int PACKETS_IN_BLOCK = 32;
     public static final int PACKET_SIZE = 1024;
-    public static final double DECREMENT_AT_MIN_PROB = 0.2;
-    public static final double DECREMENT_AT_MAX_PROB = 0.1;
+    public static final double DECREMENT_AT_MIN_PROB = 0.25;
+    public static final double DECREMENT_AT_MAX_PROB = 0.5;
     // Send keepalives every 2.5-5.0 seconds
     public static final int KEEPALIVE_INTERVAL = 2500;
     // If no activity for 15 seconds, node is dead
@@ -96,6 +96,8 @@
     private final Hashtable localStreamContexts;

     private final HashSet runningUIDs;
+    final LRUQueue recentlyCompletedIDs;
+    static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;

     byte[] myIdentity; // FIXME: simple identity block; should be unique
     /** Hash of identity. Used as setup key. */
@@ -111,7 +113,7 @@
     final PacketSender ps;
     final NodeDispatcher dispatcher;
     final String filenamesPrefix;
-    static short MAX_HTL = 10;
+    static short MAX_HTL = 5;
     private static final int EXIT_STORE_FILE_NOT_FOUND = 1;
     private static final int EXIT_STORE_IOEXCEPTION = 2;
     private static final int EXIT_STORE_OTHER = 3;
@@ -674,9 +676,6 @@
         // FIXME support compression when noderefs get big enough for it to be 
useful
     }

-    final LRUQueue recentlyCompletedIDs;
-    static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;
-    
     /**
      * Has a request completed with this ID recently?
      */

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java     
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java     
2005-10-12 19:50:08 UTC (rev 7425)
@@ -74,27 +74,18 @@
             return handleDataRequest(m);
         } else if(spec == DMT.FNPInsertRequest) {
             return handleInsertRequest(m);
+        } else if(spec == DMT.FNPSubscribeRequest) {
+            return node.subscriptions.handleSubscribeRequest(m);
+        } else if(spec == DMT.FNPResubscribeRequest) {
+            return node.subscriptions.handleResubscribeRequest(m);
         } else if(spec == DMT.FNPPublishData) {
-            return handlePublishData(m);
+            return node.subscriptions.handlePublishData(m);
         }
 //        } // SubscribeData, SubscribeRestarted etc handled by 
SubscribeSender.
         return false;
     }

     /**
-     * Handle an FNPPublishData message.
-     * @return False to put it back onto the queue.
-     */
-    private boolean handlePublishData(Message m) {
-        // Create a PublishSender. This will do all the work.
-        // FIXME maybe we should check whether we've sent the packet before?
-        // It's not really a viable DoS but it is good practice...
-        PublishHandlerSender ps =
-            node.makePublishHandlerSender(m);
-        return true;
-    }
-
-    /**
      * Handle an incoming FNPDataRequest.
      */
     private boolean handleDataRequest(Message m) {

Modified: branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java       
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java       
2005-10-12 19:50:08 UTC (rev 7425)
@@ -51,13 +51,14 @@
                             continue;
                         }

-                        // Any urgent notifications to send?
-                        long urgentTime = pn.getNextUrgentTime();
-                        if(urgentTime <= now) {
-                            // Send them
-                            pn.sendAnyUrgentNotifications();
-                        } else {
-                            nextActionTime = Math.min(nextActionTime, 
urgentTime);
+                        if(node.packetMangler == null) continue;
+                        // Any messages to send?
+                        MessageItem[] messages = null;
+                        messages = pn.grabQueuedMessageItems();
+                        if(messages != null) {
+                            // Send packets, right now, blocking, including 
any active notifications
+                            
node.packetMangler.processOutgoingOrRequeue(messages, pn, true);
+                            continue;
                         }

                         // Any packets to resend?
@@ -88,14 +89,13 @@

                         }

-                        if(node.packetMangler == null) continue;
-                        // Any messages to send?
-                        MessageItem[] messages = null;
-                        messages = pn.grabQueuedMessageItems();
-                        if(messages != null) {
-                            // Send packets, right now, blocking, including 
any active notifications
-                            
node.packetMangler.processOutgoingOrRequeue(messages, pn, true);
-                            continue;
+                        // Any urgent notifications to send?
+                        long urgentTime = pn.getNextUrgentTime();
+                        if(urgentTime <= now) {
+                            // Send them
+                            pn.sendAnyUrgentNotifications();
+                        } else {
+                            nextActionTime = Math.min(nextActionTime, 
urgentTime);
                         }

                         // Need to send a keepalive packet?

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java   
    2005-10-12 18:56:58 UTC (rev 7424)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java   
    2005-10-12 19:50:08 UTC (rev 7425)
@@ -78,8 +78,14 @@
         closestLocation = m.getDouble(DMT.NEAREST_LOCATION);
         htl = m.getShort(DMT.HTL);
         double myLoc = n.lm.getLocation().getValue();
-        if(Math.abs(myLoc - target) < Math.abs(closestLocation - target))
+        if(Math.abs(myLoc - target) < Math.abs(closestLocation - target)) {
             closestLocation = myLoc;
+            // got new closest location; if reset location, reset HTL
+            htl = Node.MAX_HTL;
+        } else {
+            // backtracking
+            htl = node.decrementHTL(source, htl);
+        }
         t = new Thread(this);
         t.setDaemon(true);
         t.start();
@@ -92,7 +98,7 @@
             // Copy it as it will be decrypted in place
             byte[] packetDataCopy = new byte[packetData.length];
             System.arraycopy(packetData, 0, packetDataCopy, 0, 
packetData.length);
-            node.subscriptions.receivedPacket(key, packetNumber, 
packetDataCopy, source);
+            //node.subscriptions.receivedPacket(key, packetNumber, 
packetDataCopy, source);
             short origHTL = htl;
             if(source != null) {
                 Message msg = DMT.createFNPAccepted(uid);
@@ -152,11 +158,6 @@
                 Logger.minor(this, "Routing insert to "+next);
                 nodesRoutedTo.add(next);

-                if(Math.abs(target - nextValue) > Math.abs(target - 
closestLocation)) {
-                    Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
-                    htl = node.decrementHTL(source, htl);
-                }
-                
                 // Now we send it
                 /**
                  * Possible responses:
@@ -165,7 +166,7 @@
                  * RejectedLoop
                  * (timeout)
                  */
-                Message msg = DMT.createFNPPublishData(htl, packetData, key, 
packetNumber, uid, closestLocation);
+                Message msg = DMT.createFNPPublishData(packetData, key, 
packetNumber, uid);

                 MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
                 MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);

Modified: branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java      
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java      
2005-10-12 19:50:08 UTC (rev 7425)
@@ -74,6 +74,14 @@
         this.nearestLoc = nearestLoc;

         target = key.toNormalizedDouble();
+        double myLoc = node.lm.getLocation().getValue();
+        if(Math.abs(myLoc-target) < Math.abs(nearestLoc-target)) {
+            nearestLoc = myLoc;
+            htl = Node.MAX_HTL;
+        } else {
+            htl = node.decrementHTL(source, htl);
+        }
+        
         Thread t = new Thread(this, "RequestSender for UID "+uid);
         t.setDaemon(true);
         t.start();
@@ -87,22 +95,14 @@
         while(true) {
             Logger.minor(this, "htl="+htl);
             if(htl == 0) {
-                // RNF
-                // Would be DNF if arrived with no HTL
-                // But here we've already routed it and that's been rejected.
-                finish(ROUTE_NOT_FOUND);
+                finish(DATA_NOT_FOUND);
                 return;
             }

             // Route it
             PeerNode next;
-            double nextValue;
             synchronized(node.peers) {
                 next = node.peers.closerPeer(source, nodesRoutedTo, target, 
true);
-                if(next != null)
-                    nextValue = next.getLocation().getValue();
-                else
-                    nextValue = -1.0;
             }

             if(next == null) {
@@ -113,11 +113,6 @@
             Logger.minor(this, "Routing insert to "+next);
             nodesRoutedTo.add(next);

-            if(Math.abs(target - nextValue) > Math.abs(target - nearestLoc)) {
-                Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+nearestLoc);
-                htl = node.decrementHTL(source, htl);
-            }
-            
             Message req = DMT.createFNPDataRequest(uid, htl, key, nearestLoc);

             /**
@@ -155,6 +150,7 @@
             }

             if(msg.getSpec() == DMT.FNPRejectedLoop) {
+                htl = node.decrementHTL(source, htl);
                 // Find another node to route to
                 continue;
             }
@@ -195,9 +191,15 @@
             }

             if(msg.getSpec() == DMT.FNPRouteNotFound) {
-                // Backtrack within available hops
-                short newHtl = msg.getShort(DMT.HTL);
-                if(newHtl < htl) htl = newHtl;
+                // Backtrack within available hops.
+                // We DO NOT include a nearestLoc on an RNF.
+                // The reason for this is that it would suck,
+                // as we *probably* haven't actually gotten any 
+                // closer to the target.
+                short newHTL = msg.getShort(DMT.HTL);
+                short oldHTL = node.decrementHTL(source, htl);
+                htl = oldHTL;
+                if(newHTL < oldHTL) htl = newHTL;
                 continue;
             }


Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-12 18:56:58 UTC (rev 7424)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-12 19:50:08 UTC (rev 7425)
@@ -1,9 +1,20 @@
 package freenet.node;

+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Random;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.keys.ClientPublishStreamKey;
 import freenet.keys.PublishStreamKey;
 import freenet.support.Logger;
 import freenet.support.NumberedItem;
 import freenet.support.NumberedRecentItems;
+import freenet.support.ShortBuffer;

 /**
  * A single subscription.
@@ -14,108 +25,668 @@

     static final int KEEP_PACKETS = 32;

+    /** Our Node */
+    final Node node;
+    final SubscriptionManager manager;
     final PublishStreamKey key;
-    ClientSubscriptionHandler localSubscribers;
-    PeerNode[] subscriberPeers;
-    final NumberedRecentItems packets;
+    final Random random;
+    // Small, rarely modified => []
+    private ClientSubscriptionHandler[] localSubscribers;
+    /** Nodes which have subscribed to the stream through us, whether
+     * or not we are currently subscribed. Also records the HTL, ID
+     * etc they used, so that if one subscription fails, we can try
+     * the next.
+     */
+    private SubscribeHandler[] subscriberPeerHandlers;
+    private PeerNode parent;
+    private final NumberedRecentItems packets;
+    /** At most one concurrent SubscribeRequest being routed */
+    private SubscribeSender sender;
+    /** Are we currently subscribed? */
+    private boolean subscribed;
+    /** Are we currently restarting? */
+    private boolean restarting;
+    /** Our restart UID */
+    private long restartUID;
+    /** Is upstream restarting? */
+    private boolean upstreamRestarting;
+    /** Upstream's restart UID */
+    private long upstreamRestartUID;
+    /** Removing from parent? */
+    private boolean dropping = false;
+    /** Location of root (root = self, if we are root) */
+    private double rootLocation;

-    public SubscriptionHandler(PublishStreamKey k) {
+    /**
+     * On creation, we set restarting = true,
+     * but we do not actually start routing a SubscribeRequest,
+     * because we do not yet have the parameters to do so.
+     * We expect to be fed them by either a call to subscribe(), or
+     * a call to handleSubscriptionRequest().
+     * @param k
+     */
+    SubscriptionHandler(SubscriptionManager mangler, PublishStreamKey k, 
Random random) {
         key = k;
-        subscriberPeers = null;
+        node = mangler.node;
+        manager = mangler;
+        subscriberPeerHandlers = null;
+        parent = null;
         packets = new NumberedRecentItems(KEEP_PACKETS, true);
+        sender = null;
+        subscribed = false;
+        restarting = false;
+        // Will be enabled when we actually start restarting
+        upstreamRestarting = false;
+        upstreamRestartUID = -1;
+        this.random = random;
     }

+    public synchronized boolean shouldDrop() {
+        return (subscriberPeerHandlers == null || 
subscriberPeerHandlers.length == 0) && 
+               (localSubscribers == null || localSubscribers.length == 0);
+    }
+
     /**
-     * Set the local subscribers handler.
-     * @return The previous local subscribers handler - should be null!
+     * Remove a node from the list of subscribers.
+     * Can be called while dropping.
+     * @param pn The node to remove.
      */
-    public synchronized ClientSubscriptionHandler 
setLocal(ClientSubscriptionHandler csh) {
-        ClientSubscriptionHandler h = localSubscribers;
-        localSubscribers = csh;
-        return h;
+    void removeSubscriberHandler(SubscribeHandler handler) {
+        Logger.minor(this, "Removing subscriber node "+handler+" from "+this);
+        synchronized(this) {
+            if(subscriberPeerHandlers == null) return;
+            int count = 0;
+            for(int i=0;i<subscriberPeerHandlers.length;i++) {
+                if(!(subscriberPeerHandlers[i] == handler || 
subscriberPeerHandlers[i] == null))
+                    count++;
+            }
+            SubscribeHandler[] newPeers = new SubscribeHandler[count];
+            int x=0;
+            for(int i=0;i<subscriberPeerHandlers.length;i++) {
+                if(!(subscriberPeerHandlers[i] == handler || 
subscriberPeerHandlers[i] == null))
+                    newPeers[x++] = subscriberPeerHandlers[i];
+            }
+            subscriberPeerHandlers = newPeers;
+            if(!shouldDrop()) return;
+        }
+        drop();
     }

-    public synchronized boolean shouldDrop() {
-        return subscriberPeersEmpty() && localSubscribers == null;
+    public class PacketItem implements NumberedItem {
+
+        long packetNumber;
+        byte[] data;
+        
+        public PacketItem(long packetNumber, byte[] packetData) {
+            this.packetNumber = packetNumber;
+            data = packetData;
     }

     /**
-     * @return True if there are no subscriber peers.
+         * Forward this packet to a subscriber node.
+         * As an FNPSubscribeData. NOT an FNPPublishData.
+         * DO NOT CALL WITH LOCKS HELD, as sends packets.
+         * @param pn Node to send to.
      */
-    private final boolean subscriberPeersEmpty() {
-        PeerNode[] peers = subscriberPeers;
-        return (peers == null || peers.length == 0);
+        public void forwardTo(PeerNode pn) throws NotConnectedException {
+            Message msg = DMT.createFNPSubscribeData(key, packetNumber, data);
+            pn.sendAsync(msg, null);
     }

-    public void addSubscriberNode(PeerNode pn, long lastSeen) {
+        public long getNumber() {
+            return packetNumber;
+        }
+
+    }
+
+    /**
+     * Handle a PublishData packet:
+     * <li>If we have the same packet, with the
+     * same contents, return FNPPublishSucceeded.</li>
+     * <li>If we have the same packet with
+     * different contents, return FNPPublishCollision with a suggested
+     * next packet number.</li>
+     * <li>If we are not the root, forward to our parent, and wait
+     * for FNPPublishSucceeded or FNPPublishCollision.</li>
+     * <li>If we are the root, and we don't have that packet number,
+     * accept it; FNPPublishSucceeded. Then broadcast it to all other
+     * nodes on the tree.</li>
+     * @return True unless we want the message to be put back
+     * onto the queue.
+     */
+    public boolean handlePublishData(long id, PeerNode source, Message m) 
throws DroppingSubscriptionHandlerException {
+        long packetNumber = m.getLong(DMT.PACKET_NO);
+        byte[] data = ((ShortBuffer) m.getObject(DMT.DATA)).getData();
+        PacketItem item;
+        boolean isRoot;
+        boolean isRestarting;
+        PacketItem myItem = null;
         synchronized(this) {
-            if(subscriberPeers == null || subscriberPeers.length == 0) {
-                subscriberPeers = new PeerNode[] { pn };
+            if(dropping) throw new DroppingSubscriptionHandlerException();
+            item = (PacketItem) packets.get(packetNumber);
+            isRoot = isRoot();
+            isRestarting = restarting || upstreamRestarting;
+            if(isRoot && item == null) {
+                myItem = new PacketItem(packetNumber, data);
+                packets.add(myItem);
+            }
+        }
+        if(item != null) {
+            // Equal contents?
+            if(Arrays.equals(item.data, data)) {
+                try {
+                    // Already have the packet; success
+                    source.sendAsync(DMT.createFNPPublishDataSucceeded(id), 
null);
+                } catch (NotConnectedException e) {
+                    Logger.minor(this, "Not connected while sending success 
because already have packet to "+
+                            source+" ("+key+","+packetNumber+")");
+                }
             } else {
-                PeerNode[] peers = new PeerNode[subscriberPeers.length+1];
-                System.arraycopy(subscriberPeers, 0, peers, 0, 
subscriberPeers.length);
-                peers[peers.length-1] = pn;
-                subscriberPeers = peers;
+                // Collision
+                try {
+                    source.sendAsync(DMT.createFNPPublishDataCollision(id, 
packets.getLastNumber()+1), null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Not connected while sending 
collision: "+
+                            source+" ("+key+","+packetNumber+")");
             }
         }
-        NumberedItem[] items = packets.getAfter(lastSeen);
-        if(items == null || items.length == 0) return;
-        for(int i=0;i<items.length;i++) {
-            PacketItem item = (PacketItem)items[i];
+        } else {
+            if(isRestarting) {
+                try {
+                    
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(id), null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Disconnected while sending rejection 
due to restart: "+
+                            source+" ("+key+","+packetNumber+")");
+                }
+            } else if(isRoot) {
+                // Added new item already, success
+                try {
+                    source.sendAsync(DMT.createFNPPublishDataSucceeded(id), 
null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Disconnected while adding new packet: 
"+
+                            source+" ("+key+","+packetNumber+")");
+                }
+                // Send to all other nodes; those below source will send when 
get success message
+                broadcast(myItem, source);
+            } else {
+                // Forward to parent and wait for success/failure - off thread!
+                ForwardPublishUpTree forwarder = new ForwardPublishUpTree(id, 
source, packetNumber, data, m);
+                Thread t = new Thread(forwarder);
+                t.setDaemon(true); // FIXME should this block exit?
+                t.start();
+                return true; // don't unlockUID yet
+            }
+        }
+        // In all cases...
+        source.node.unlockUID(id);
+        return true;
+    }
+
+    /**
+     * Forward a PacketItem to all subscriber peers, except for the one given.
+     * The item must already have been added to the packets cache.
+     * Also sends to any local subscribers.
+     */
+    private void broadcast(PacketItem item, PeerNode ignore) {
+        SubscribeHandler[] peers;
+        synchronized(this) {
+            peers = subscriberPeerHandlers;
+        }
+        if(peers == null) return;
+        for(int i=0;i<peers.length;i++) {
+            SubscribeHandler sub = peers[i];
+            if(sub == null) continue;
+            PeerNode pn = sub.subscriber;
+            if(pn != ignore) {
+                try {
             item.forwardTo(pn);
+                } catch (NotConnectedException e) {
+                    Logger.minor(this, "Lost connection to subscriber peer 
"+pn+" when forwarding packet "+item);
+                    removeSubscriberHandler(sub);
+                }
         }
     }
+        ClientSubscriptionHandler[] handlers;
+        synchronized(this) {
+            handlers = localSubscribers;
+        }
+        if(handlers != null) {
+            for(int i=0;i<handlers.length;i++)
+                handlers[i].processPacket(item.packetNumber, item.data);
+        }
+    }
+
+    private synchronized boolean isRoot() {
+        return parent == null && !restarting;
+    }

     /**
-     * Process an incoming PublishData packet.
+     * Forward a PublishData to our parent.
+     * Wait for a PublishDataSucceeded or a PublishDataCollision.
+     * If the former, broadcast. If the latter, return to sender.
      */
-    public void processPacket(long packetNumber, byte[] packetData, PeerNode 
source) {
-        // First, have we seen it before?
-        PacketItem item = new PacketItem(packetNumber, packetData);
-        if(!packets.add(item)) {
-            Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+" 
twice");
+    private class ForwardPublishUpTree implements Runnable {
+
+        final long uid;
+        final long packetNumber;
+        final PeerNode source;
+        final byte[] contents;
+        final Message origMessage;
+        // Yet another arbitrary timeout. Should be plenty considering these 
are supposed to be real time streams!
+        public static final int TIMEOUT = 30000;
+        
+        public ForwardPublishUpTree(long id, PeerNode src, long packetNo, 
byte[] data, Message m) {
+            uid = id;
+            packetNumber = packetNo;
+            source = src;
+            contents = data;
+            origMessage = m;
+        }
+
+        public void run() {
+            try {
+            Logger.minor(this, "Running "+this);
+            PeerNode myParent;
+            while(true) {
+            synchronized(this) {
+                myParent = parent;
+                if(restarting) myParent = null;
+                if(upstreamRestarting) myParent = null;
+            }
+            if(myParent == null) {
+                // We just became root, or we restarted
+                handlePublishData(uid, source, origMessage);
+                return;
+            }
+            // Send upwards
+            Message msg = DMT.createFNPPublishData(contents, key, 
packetNumber, uid);
+            try {
+                myParent.sendAsync(msg, null);
+            } catch (NotConnectedException e) {
+                lostConnection();
+                return;
+            }
+            // Wait for reply
+            MessageFilter mfSuccess = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataSucceeded);
+            MessageFilter mfRestarting = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataRejectedRestarting);
+            MessageFilter mfCollision = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataCollision);
+            MessageFilter mfInvalid = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataInvalid);
+            MessageFilter mfOverload = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedOverload);
+            MessageFilter mfLoop = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedLoop);
+            
+            MessageFilter mf = 
mfSuccess.or(mfRestarting.or(mfCollision.or(mfInvalid.or(mfOverload.or(mfLoop)))));
+            
+            Message reply;
+            
+            try {
+                reply = source.node.usm.waitFor(mf);
+            } catch (DisconnectedException e1) {
+                lostConnection();
             return;
         }
-        PeerNode[] peers;
-        // We don't strictly need to synchronize, but
-        // if we don't we may lose packets.
+            
+            if(reply == null && !myParent.isConnected()) {
+                Logger.error(this, "Did not get notified of parent 
disconnection!");
+                lostConnection();
+                return;
+            }
+            
+            if(reply == null || reply.getSpec() == DMT.FNPRejectedOverload) {
+                // Timeout
+                Logger.error(this, "Timeout - "+myParent+" did not respond in 
"+TIMEOUT+"ms forwarding PublishData");
+                try {
+                    source.sendAsync(DMT.createFNPRejectedOverload(uid), null);
+                } catch (NotConnectedException e1) {
+                    Logger.normal(this, "Disconnected while sending rejected 
due to overload: "+
+                            source+" ("+key+","+packetNumber+")");
+                }
+                return;
+            }
+            
+            if(reply.getSpec() == DMT.FNPRejectedLoop) {
         synchronized(this) {
-            peers = subscriberPeers;
+                    if(myParent != parent) continue;
+                }
+                Logger.error(this, "Loop rejection in forwarding PublishData 
to parent!");
+                // Tree topology is completely broken
+                forceSubscribe();
+                try {
+                    
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Disconnected while sending rejection 
due to restart due to loop: "+
+                            source+" ("+key+","+packetNumber+")");
+                }
+                return;
         }
-        if(peers != null)
-            for(int i=0;i<peers.length;i++)
-                item.forwardTo(peers[i]);

-        // Redistribute it to local subscribers
-        localSubscribers.processPacket(packetNumber, packetData);
+            if(reply.getSpec() == DMT.FNPPublishDataInvalid) {
+                // FIXME check it
+                try {
+                    source.sendAsync(DMT.createFNPPublishDataInvalid(uid), 
null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Disconnected while sending invalid 
data rejection: "+
+                            source+" ("+key+","+packetNumber+")");
+                }
+                return;
     }

-    public class PacketItem implements NumberedItem {
+            if(reply.getSpec() == DMT.FNPPublishDataRejectedRestarting) {
+                // FIXME enforcement - has it recently said it's restarting?
+                try {
+                    
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Disconnected while relaying 
rejected:restarting: "+
+                            source+" ("+key+","+packetNumber+")");
+                }
+                return;
+            }

-        long packetNumber;
-        byte[] data;
+            if(reply.getSpec() == DMT.FNPPublishDataCollision) {
+                // Yay finally a nontrivial failure mode!
+                long suggested = reply.getLong(DMT.SUGGESTED_SEQNO);
+                suggested = Math.max(suggested, packets.getLastNumber()+1);
+                try {
+                    source.sendAsync(DMT.createFNPPublishDataCollision(uid, 
suggested), null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Not connected while sending 
collision: "+
+                            source+" ("+key+","+packetNumber+")");
+                }
+                return;
+            }
+            
+            if(reply.getSpec() == DMT.FNPPublishDataSucceeded) {
+                // Success!
+                PacketItem item = new PacketItem(packetNumber, contents);
+                packets.add(item);
+                try {
+                    source.sendAsync(DMT.createFNPPublishDataSucceeded(uid), 
null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Disconnected while adding new packet: 
"+
+                            source+" ("+key+","+packetNumber+")");
+                }
+                broadcast(item, source);
+                return;
+            }
+            
+            Logger.error(this, "Unrecognized packet: "+reply);
+            return;
+            }
+            } catch (Throwable t) {
+                Logger.error(this, "Caught "+t, t);
+            } finally {
+                source.node.unlockUID(uid);
+                source.node.completed(uid);
+            }
+        }
+
+        private void lostConnection() {
+            Logger.error(this, "Lost parent node");
+            try {
+                
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
+            } catch (NotConnectedException e1) {
+                Logger.normal(this, "Disconnected while sending rejection due 
to restart: "+
+                        source+" ("+key+","+packetNumber+")");
+            }
+        }

-        public PacketItem(long packetNumber, byte[] packetData) {
-            this.packetNumber = packetNumber;
-            data = packetData;
         }

         /**
-         * Forward this packet to a subscriber node.
-         * As an FNPSubscribeData. NOT an FNPPublishData.
-         * DO NOT CALL WITH LOCKS HELD, as sends packets.
-         * @param pn Node to send to.
+     * Handle a SubscribeRequest. Do not route it. Add the
+     * node to the list of subscriber nodes and send it our status.
+     * Caller should have checked ID for loops.
+     * @param m The message to reply to.
+     * @throws DroppingSubscriptionHandlerException If the node is currently 
being dropped
+     * from the parent.
          */
-        public void forwardTo(PeerNode pn) {
+    public void handleSubscribeRequest(Message m) throws 
DroppingSubscriptionHandlerException {
+        long id = m.getLong(DMT.UID);
+        short htl = m.getShort(DMT.HTL);
+        long lastSeen = m.getLong(DMT.LAST_SEQNO);
+        int orderedMessagesSeq = m.getInt(DMT.ORDERED_MESSAGE_NUM);
+        PeerNode source = (PeerNode) m.getSource();
+        NumberedItem[] items; // packets sender has missed
+        
+        // Reset the HTL
+        double origNearest = m.getDouble(DMT.NEAREST_LOCATION);
+        double nearest = origNearest;
+        double myLoc = source.node.lm.getLocation().getValue();
+        double target = key.toNormalizedDouble();
+        if(Math.abs(myLoc-target) < Math.abs(nearest-target)) {
+            htl = Node.MAX_HTL;
+            nearest = myLoc;
+        } else {
+            htl = source.node.decrementHTL(source, htl);
+        }

+        boolean nowRestarting = false;
+        SubscribeHandler handler;
+        try {
+            synchronized(this) {
+                if(isSubscribed(source)) {
+                    Logger.error(this, "Node subscribing twice: "+source);
+                    return;
+                }
+                if(dropping) throw new DroppingSubscriptionHandlerException();
+                
+                // Collect the old packets to send when accepted
+                items = packets.getAfter(lastSeen);
+                
+                // Will tell client node current status
+                handler = new SubscribeHandler(node, id, htl, source, 
orderedMessagesSeq, origNearest, nearest, this, items);
+                
+                if((!subscribed) && (!restarting)) {
+                    restarting = true;
+                    nowRestarting = true;
+                }
+            }
+            if(nowRestarting) {
+                // Create sender
+                startSubscribe(handler);
+            }
+        } finally {
+            if(nowRestarting && sender == null) {
+                restarting = false;
+            }
+        }
+        
+        try {
+            if(items != null && items.length > 0) {
+                for(int i=0;i<items.length;i++) {
+                    PacketItem item = (PacketItem)items[i];
+                    item.forwardTo(source);
+                }
+            }
+        } catch (NotConnectedException e) {
+            Logger.error(this, "New subscriber node disconnected: "+source);
+            removeSubscriberHandler(handler);
+        }
+    }
+
+    /**
+     * Is the node already subscribed?
+     */
+    private synchronized boolean isSubscribed(PeerNode source) {
+        for(int i=0;i<subscriberPeerHandlers.length;i++) {
+            if(subscriberPeerHandlers[i].subscriber == source) return true;
+        }
+        return false;
+    }
+
+    private void startSubscribe(SubscribeHandler handler) {
+        long id = handler.origID;
+        short htl = handler.origHTL;
+        PeerNode source = handler.subscriber;
+        double nearest = handler.origNearestLoc;
+        startSubscribe(id, htl, nearest, source);
+    }
+
+    private void startSubscribe(long id, short htl, double nearest, PeerNode 
source) {
+        synchronized(this) {
+            if(subscribed) return;
+            if(sender != null) return;
+            sender = new SubscribeSender(id, htl, source, nearest, this);
+        }
+        Thread t = new Thread(sender);
+        t.setDaemon(true);
+        t.start();
+    }
+    
+    synchronized void addSubscriberHandler(SubscribeHandler sub) {
+        // Add to subscriberPeers
+        if(subscriberPeerHandlers != null) {
+            for(int i=0;i<subscriberPeerHandlers.length;i++) {
+                SubscribeHandler s = subscriberPeerHandlers[i];
+                if(s == sub) break;
+                if(s.subscriber == sub.subscriber) {
+                    Logger.error(this, "Subscribing the same node twice", new 
Exception("error"));
+                }
+            }
+        }
+        if(subscriberPeerHandlers == null || subscriberPeerHandlers.length == 
0) {
+            subscriberPeerHandlers = new SubscribeHandler[] { sub };
+        } else {
+            SubscribeHandler[] peers = new 
SubscribeHandler[subscriberPeerHandlers.length+1];
+            System.arraycopy(subscriberPeerHandlers, 0, peers, 0, 
subscriberPeerHandlers.length);
+            peers[peers.length-1] = sub;
+            subscriberPeerHandlers = peers;
+        }
+    }
+
+    public synchronized ClientSubscriptionHandler 
makeClientSubscriptionHandler(
+            ClientPublishStreamKey key2) throws 
DroppingSubscriptionHandlerException {
+        if(dropping) throw new DroppingSubscriptionHandlerException();
+        if (localSubscribers != null && localSubscribers.length > 0) {
+            for (int i = 0; i < localSubscribers.length; i++)
+                if (localSubscribers[i].getKey().equals(key2))
+                        return localSubscribers[i];
+        }
+        ClientSubscriptionHandler csh = new ClientSubscriptionHandler(this,
+                key2);
+        if (localSubscribers == null)
+            localSubscribers = new ClientSubscriptionHandler[] { csh};
+        else {
+            ClientSubscriptionHandler[] handlers = new 
ClientSubscriptionHandler[localSubscribers.length + 1];
+            if (localSubscribers.length > 0)
+                    System.arraycopy(localSubscribers, 0, handlers, 0,
+                            localSubscribers.length);
+            handlers[handlers.length - 1] = csh;
+            localSubscribers = handlers;
+        }
+        return csh;
+    }
+
+    // Can be called while dropping
+    public void remove(ClientSubscriptionHandler handler) {
+        synchronized(this) {
+            if(localSubscribers == null) {
+                Logger.error(this, "Removing "+handler+" but no subscribers!");
+                // Could happen after a throwable was caught perhaps?
+            }
+            int x = 0;
+            boolean seen = false;
+            for(int i=0;i<localSubscribers.length;i++) {
+                ClientSubscriptionHandler csh = localSubscribers[i];
+                if(csh != handler) x++;
+                else seen = true;
+            }
+            if(!seen)
+                Logger.error(this, "Removing "+handler+" but not found in 
subscriber list");
+            ClientSubscriptionHandler[] handlers = new 
ClientSubscriptionHandler[x];
+            x=0;
+            for(int i=0;i<localSubscribers.length;i++) {
+                ClientSubscriptionHandler csh = localSubscribers[i];
+                if(csh != handler) handlers[x++] = csh;
+            }
+            localSubscribers = handlers;
+            if(!shouldDrop()) return;
+        }
+        drop();
+    }
+
+    /**
+     * Drop ourself from the SubscriptionManager.
+     */
+    private void drop() {
+        synchronized(this) {
+            if(shouldDrop()) {
+                dropping = true;
+            } else {
+                return;
+            }
+        }
+        manager.drop(this, key);
+    }
+
+    /**
+     * Create a new subscribe request, unless we are connected already, or
+     * already running a subscribe request.
+     */
+    public void forceSubscribe() throws DroppingSubscriptionHandlerException {
+        boolean restartingNow = false;
+        try {
+            synchronized(this) {
+                if(dropping) throw new DroppingSubscriptionHandlerException();
+                if(restarting) {
+                    return;
+                } else {
+                    restartingNow = true;
+                    restartUID = random.nextLong();
+                    restarting = true;
+                }
+            }
+            startSubscribe(random.nextLong(), Node.MAX_HTL, 
node.lm.getLocation().getValue(), null);
+        } finally {
+            if(restartingNow && sender == null)
+                restarting = false;
+        }
+    }
+
+    /**
+     * Callback from SubscribeSender, indicating a change in status.
+     * @param sender2
+     * @param status
+     */
+    void statusChange(SubscribeSender sender2, int status) {
             // TODO Auto-generated method stub

         }

-        public long getNumber() {
-            return packetNumber;
+    synchronized long getRestartUID() {
+        return restartUID;
         }

+    public synchronized double targetLocation() {
+        return key.toNormalizedDouble();
     }

+    /**
+     * @return True if we are currently subscribed or root
+     */
+    public synchronized boolean isSubscribed() {
+        return subscribed;
+    }
+
+    /**
+     * @return The location of the root node. If we are the root,
+     * returns our location.
+     */
+    public synchronized double rootLoc() {
+        return rootLocation;
+    }
+
+    /**
+     * @return The last sequence number we received, for purposes of 
subscribing.
+     * This is not necessarily the greatest seq# we have seen.
+     * It is calculated as follows:
+     * - If we have no cached packets, we return -1. (Negative values are 
invalid;
+     *   this means we want everything, in order).
+     * - If our packets have entirely contiguous values, we return the number 
of
+     *   the highest packet (taking into account wraparound).
+     * - Otherwise we return the value of the packet before the first 
noncontiguous
+     *   packet.
+     */
+    public long getLastSeqNum() {
+        return packets.getLastContiguousSeqNum();
+    }
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java    
    2005-10-12 18:56:58 UTC (rev 7424)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java    
    2005-10-12 19:50:08 UTC (rev 7425)
@@ -2,6 +2,9 @@

 import java.util.HashMap;

+import freenet.io.comm.DMT;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
 import freenet.keys.ClientPublishStreamKey;
 import freenet.keys.PublishStreamKey;
 import freenet.support.Logger;
@@ -18,55 +21,48 @@
     // We have up to 32 subscriptions
     private final int MAX_COUNT = 32;

-    private final Node node;
+    final Node node;
     /** map: key -> sub. definitively has all subs. */
     private final HashMap subscriptionsByKey;
 //    /** map: parent node -> linkedlist of subs. does not include 
parent==null i.e. parent==this. */
 //    private final HashMap subscriptionsByParent;
 //    /** map: child node -> linkedlist of subs. some subs may have no 
children so won't be included. */
 //    private final HashMap subscriptionsByChildren;
-    /** Client subscriptions. These are the ClientSubscriptionHandler's. */
-    private final HashMap clientSubscriptionsByKey;

     SubscriptionManager(Node n) {
         node = n;
         subscriptionsByKey = new HashMap();
 //        subscriptionsByParent = new HashMap();
 //        subscriptionsByChildren = new HashMap();
-        clientSubscriptionsByKey = new HashMap();
     }

     /**
      * Local subscription.
      * Add the stream if necessary, and subscribe to it.
      * @return Null if cannot subscribe, otherwise the ClientSubscription.
+     * 
+     * Architecture:
+     * - Find (or make, and start a subscribe) the SubscriptionHandler.
+     * - Find (or make) the ClientSubscriptionHandler
+     * - Find (or make) the ClientSubscriptionImpl
      */
     public ClientSubscription localSubscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb) {
-        // FIXME implement doing a sub request
-        // For now we will just eavesdrop on locally passed subs.
-        // Each pretends it is the root
-        ClientSubscriptionHandler csh;
-        boolean add = false;
-        Logger.minor(this, "Subscribing locally to "+key);
-        synchronized(this) {
-            csh = (ClientSubscriptionHandler) 
clientSubscriptionsByKey.get(key);
-            if(csh == null) {
-                csh = new ClientSubscriptionHandler(this, key);
-                add = true;
-                clientSubscriptionsByKey.put(key, csh);
-            }
-        }
-        if(add) {
-            Logger.minor(this, "New subscription to "+key);
+        while(true) {
+            try {
             SubscriptionHandler sub = makeSubscription(key.getKey());
-            if(sub.setLocal(csh) != null) {
-                Logger.error(this, "Had local already! for "+key);
-            }
-        }
+                if(sub == null) 
+                    return null; // too many streams
+                
+                ClientSubscriptionHandler csh =
+                    sub.makeClientSubscriptionHandler(key);
+                
         ClientSubscriptionImpl csi = new ClientSubscriptionImpl(key, cb, csh);
         csh.add(csi);
-        // FIXME implement the rest - especially sending sub reqs out
         return csi;
+            } catch (DroppingSubscriptionHandlerException e) {
+                Logger.minor(this, "Caught "+e);
+            }
+        }
     }

     /**
@@ -75,7 +71,11 @@
      * subscription.
      */
     private synchronized SubscriptionHandler makeSubscription(PublishStreamKey 
key) {
-        SubscriptionHandler sub = (SubscriptionHandler) 
subscriptionsByKey.get(key);
+        SubscriptionHandler sub;
+        while(true) {
+            try {
+                synchronized(this) {
+                    sub = (SubscriptionHandler) subscriptionsByKey.get(key);
         if(sub != null) return sub;
         if(subscriptionsByKey.size() >= MAX_COUNT) {
             Logger.normal(this, "Rejecting subscription for "+key);
@@ -83,39 +83,113 @@
         }

         // Make a new one
-        sub = new SubscriptionHandler(key);
+                    sub = new SubscriptionHandler(this, key, node.random);
         subscriptionsByKey.put(key, sub);
-        return sub;
     }
-
-    public synchronized void remove(ClientSubscriptionHandler handler) {
-        PublishStreamKey key = handler.getKey().getKey();
-        SubscriptionHandler sub = (SubscriptionHandler) 
subscriptionsByKey.get(key);
-        ClientSubscriptionHandler oldHandler = sub.setLocal(handler);
-        if(oldHandler != null && oldHandler != handler) {
-            Logger.error(this, "Already had a different handler: 
"+oldHandler+" should be "+handler);
+                sub.forceSubscribe();
+                return sub;
+            } catch (DroppingSubscriptionHandlerException e) {
+                Logger.minor(this, "Caught "+e+" in makeSubscription");
         }
-        if(sub.shouldDrop()) {
-            drop(sub, key);
         }
     }

-    private synchronized void drop(SubscriptionHandler sub, PublishStreamKey 
key) {
-        subscriptionsByKey.remove(key);
+    synchronized void drop(SubscriptionHandler sub, PublishStreamKey key) {
+        SubscriptionHandler oldSub = 
+            (SubscriptionHandler) subscriptionsByKey.remove(key);
+        if(oldSub != sub) {
+            Logger.error(this, "Dropping "+sub+" but remove returned "+oldSub);
+        }
     }

     /**
-     * Handle a received packet.
+     * Handle a PublishData packet:
+     * <ul><li>If we do not have the key, return an error.</li>
+     * <li>If we have the key, and have the same packet, with the
+     * same contents, return FNPPublishSucceeded.</li>
+     * <li>If we have the key, and have the same packet with
+     * different contents, return FNPPublishCollision with a suggested
+     * next packet number.</li>
+     * <li>If we are not the root, forward to our parent, and wait
+     * for FNPPublishSucceeded or FNPPublishCollision.</li>
+     * <li>If we are the root, and we don't have that packet number,
+     * accept it; FNPPublishSucceeded. Then broadcast it to all other
+     * nodes on the tree.</li>
+     * @return True unless we want the message to be put back
+     * onto the queue.
      */
-    public void receivedPacket(PublishStreamKey key, long packetNumber, byte[] 
packetData, PeerNode source) {
+    public boolean handlePublishData(Message m) {
+        // Firstly, do we have the key?
+        PublishStreamKey key = (PublishStreamKey) m.getObject(DMT.KEY);
+        PeerNode source = (PeerNode) m.getSource();
+        long id = m.getLong(DMT.UID);
+        if(!node.lockUID(id) || node.recentlyCompleted(id)) {
+            Logger.error(this, "Rejected "+id+" - loop");
+            try {
+                Message loop = DMT.createFNPRejectedLoop(id);
+                source.sendAsync(loop, null);
+            } catch (NotConnectedException e) {
+                Logger.error(this, "Not connected sending reject:loop on "+id);
+            }
+            return true;
+        }
+        try {
         SubscriptionHandler sub;
         synchronized(this) {
             sub = (SubscriptionHandler) subscriptionsByKey.get(key);
         }
         if(sub == null) {
-            Logger.normal(this, "Dropped sub packet from "+source+" on "+key+" 
- not subscribed");
-            return;
+                // We do not have the key!
+                Message reply = DMT.createFNPPublishDataNotSubscribed(id);
+                source.sendAsync(reply, null);
+                return true;
+            }
+            return sub.handlePublishData(id, source, m);
+        } catch (Throwable t) {
+            node.unlockUID(id);
+            node.completed(id);
+            Logger.error(this, "Caught "+t, t);
+            return true;
         }
-        sub.processPacket(packetNumber, packetData, source);
     }
+
+    /**
+     * Handle a SubscribeRequest.
+     * @return True unless we want the message to be put back
+     * onto the queue.
+     */
+    public boolean handleSubscribeRequest(Message m) {
+        // Do we have the stream in question, firstly?
+        PublishStreamKey key = (PublishStreamKey) m.getObject(DMT.KEY);
+        PeerNode source = (PeerNode) m.getSource();
+        long id = m.getLong(DMT.UID);
+        if(!node.lockUID(id) || node.recentlyCompleted(id)) {
+            Logger.error(this, "Rejected "+id+" - loop");
+            try {
+                Message loop = DMT.createFNPRejectedLoop(id);
+                source.sendAsync(loop, null);
+            } catch (NotConnectedException e) {
+                Logger.error(this, "Not connected sending reject:loop on "+id);
+            }
+            return true;
+        }
+        try {
+            SubscriptionHandler sub;
+            synchronized(this) {
+                sub = (SubscriptionHandler) subscriptionsByKey.get(key);
+                if(sub == null) {
+                    if(subscriptionsByKey.size() < MAX_COUNT) {
+                        sub = new SubscriptionHandler(this, key, node.random);
+                        subscriptionsByKey.put(key, sub);
+                    }
+                }
+            }
+            sub.handleSubscribeRequest(m);
+        } catch (Throwable t) {
+            node.unlockUID(id);
+            node.completed(id);
+            Logger.error(this, "Caught "+t+" handling "+m, t);
+        }
+        return true;
+    }
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java 
    2005-10-12 18:56:58 UTC (rev 7424)
+++ 
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java 
    2005-10-12 19:50:08 UTC (rev 7425)
@@ -80,8 +80,8 @@
         };
     }

-    public synchronized NumberedItem get(int num) {
-        int x = java.util.Arrays.binarySearch(items, new Integer(num), 
myComparator);
+    public synchronized NumberedItem get(long num) {
+        int x = java.util.Arrays.binarySearch(items, new Long(num), 
myComparator);
         if(x >= 0) return items[x];
         return null;
     }
@@ -162,4 +162,46 @@
         System.arraycopy(items, firstGreater, out, 0, 
items.length-firstGreater);
         return out;
     }
+
+    /**
+     * @return The number of the item with the highest value
+     * so far. If the list is completely empty this will throw
+     * an NPE.
+     */
+    public synchronized long getLastNumber() {
+        return items[items.length-1].getNumber();
+    }
+
+    /**
+     * @return The last sequence number we received, for purposes of 
subscribing.
+     * This is not necessarily the greatest seq# we have seen.
+     * It is calculated as follows:
+     * - If we have no cached packets, we return -1. (Negative values are 
invalid;
+     *   this means we want everything, in order).
+     * - If our packets have entirely contiguous values, we return the number 
of
+     *   the highest packet (taking into account wraparound).
+     * - Otherwise we return the value of the packet before the first 
noncontiguous
+     *   packet.
+     */
+    public synchronized long getLastContiguousSeqNum() {
+        checkSorted(); // might as well
+        long max = -1;
+        boolean nullSoFar = true;
+        for(int i=0;i<items.length;i++) {
+            NumberedItem item = items[i];
+            if(item != null) {
+                long x = item.getNumber();
+                if(nullSoFar) {
+                    max = x;
+                } else {
+                    if(x == max+1)
+                        max = x;
+                    else
+                        return max;
+                }
+                nullSoFar = false;
+            }
+        }
+        return max;
+    }
 }


Reply via email to