Author: toad
Date: 2005-10-13 17:33:49 +0000 (Thu, 13 Oct 2005)
New Revision: 7431

Modified:
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
Log:
Now only need to implement the remaining actual callbacks.

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java   
2005-10-13 16:20:26 UTC (rev 7430)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java   
2005-10-13 17:33:49 UTC (rev 7431)
@@ -100,7 +100,7 @@
      * Send the current status to the source node.
      * @return True unless we disconnected.
      */
-    private boolean sendStatusMessage(boolean subscribed, double rootLoc) {
+    boolean sendStatusMessage(boolean subscribed, double rootLoc) {
         Message msg;
         if(subscribed)
             msg = DMT.createFNPSubscribeSucceeded(origID, counter++, rootLoc);

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-13 16:20:26 UTC (rev 7430)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-13 17:33:49 UTC (rev 7431)
@@ -127,6 +127,7 @@
                 // :(
                 // Timeout or rejected:overload
                 Logger.error(this, "Timeout or rejected:overload in 
restarting: "+msg+" for "+id+" to "+next);
+                handler.senderRejectedOverload();
                 setStatus(REJECTED_OVERLOAD);
                 return;
             }
@@ -134,6 +135,7 @@
             if(msg.getSpec() == DMT.FNPRouteNotFound) {
                 // It didn't find anywhere closer than us or someone behind us 
:(
                Logger.minor(this, msg.toString()+" in runPhase2Restarting");
+               handler.senderRestartingSearchingRNF();
                setStatus(SEARCHING);
                return;
             }
@@ -186,11 +188,14 @@
                        continue;
                 } else if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
                        Logger.debug(this, "Restarting");
+                       long restartID = msg.getLong(DMT.RESTART_UID);
+                    handler.subscribeRestarted(next, restartID);
                     setStatus(RESTARTING);
                     return;
                 } else if(msg.getSpec() == DMT.FNPRejectedOverload) {
                        Logger.debug(this, "Rejected: overload while in 
succeeded on "+this);
-                       setStatus(SEARCHING);
+                    handler.senderRejectedOverload();
+                       setStatus(REJECTED_OVERLOAD);
                        return;
                 }
                 Logger.error(this, "WTF?: "+msg);
@@ -211,6 +216,8 @@
         while(true) {
             // Check HTL > 0
             if(htl <= 0) {
+               // REDFLAG: We use RNF here... even though really it's a DNF
+               handler.senderRNF();
                 setStatus(ROUTE_NOT_FOUND);
                 return;
             }
@@ -223,6 +230,7 @@
             
             if(next == null) {
                 // Backtrack
+               handler.senderRNF();
                 setStatus(ROUTE_NOT_FOUND);
                 return;
             }
@@ -277,6 +285,7 @@
             if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
                 // Timeout or rejected:overload
                 Logger.error(this, "Timeout or rejected:overload: "+msg+" for 
"+id+" to "+next);
+                handler.senderRejectedOverload();
                 setStatus(REJECTED_OVERLOAD);
                 return;
             }
@@ -328,7 +337,6 @@
     public void setStatus(int newStatus) {
        Logger.debug(this, "Setting status to "+newStatus+" on "+this);
         status = newStatus;
-        handler.statusChange(this, status);
     }
     
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-13 16:20:26 UTC (rev 7430)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-13 17:33:49 UTC (rev 7431)
@@ -29,6 +29,7 @@
     final Node node;
     final SubscriptionManager manager;
     final PublishStreamKey key;
+    final double target;
     final Random random;
     // Small, rarely modified => []
     private ClientSubscriptionHandler[] localSubscribers;
@@ -79,6 +80,7 @@
         upstreamRestarting = false;
         upstreamRestartUID = -1;
         this.random = random;
+        target = key.toNormalizedDouble();
     }
 
     public synchronized boolean shouldDrop() {
@@ -122,12 +124,12 @@
             data = packetData;
     }
 
-    /**
+        /**
          * Forward this packet to a subscriber node.
          * As an FNPSubscribeData. NOT an FNPPublishData.
          * DO NOT CALL WITH LOCKS HELD, as sends packets.
          * @param pn Node to send to.
-     */
+         */
         public void forwardTo(PeerNode pn) throws NotConnectedException {
             Message msg = DMT.createFNPSubscribeData(key, packetNumber, data);
             pn.sendAsync(msg, null);
@@ -239,13 +241,13 @@
             PeerNode pn = sub.subscriber;
             if(pn != ignore) {
                 try {
-            item.forwardTo(pn);
+                       item.forwardTo(pn);
                 } catch (NotConnectedException e) {
                     Logger.minor(this, "Lost connection to subscriber peer 
"+pn+" when forwarding packet "+item);
                     removeSubscriberHandler(sub);
                 }
+            }
         }
-    }
         ClientSubscriptionHandler[] handlers;
         synchronized(this) {
             handlers = localSubscribers;
@@ -285,138 +287,189 @@
 
         public void run() {
             try {
-            Logger.minor(this, "Running "+this);
-            PeerNode myParent;
-            while(true) {
-            synchronized(this) {
-                myParent = parent;
-                if(restarting) myParent = null;
-                if(upstreamRestarting) myParent = null;
-            }
-            if(myParent == null) {
-                // We just became root, or we restarted
-                handlePublishData(uid, source, origMessage);
-                return;
-            }
-            // Send upwards
-            Message msg = DMT.createFNPPublishData(contents, key, 
packetNumber, uid);
-            try {
-                myParent.sendAsync(msg, null);
-            } catch (NotConnectedException e) {
-                lostConnection();
-                return;
-            }
-            // Wait for reply
-            MessageFilter mfSuccess = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataSucceeded);
-            MessageFilter mfRestarting = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataRejectedRestarting);
-            MessageFilter mfCollision = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataCollision);
-            MessageFilter mfInvalid = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataInvalid);
-            MessageFilter mfOverload = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedOverload);
-            MessageFilter mfLoop = 
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID, 
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedLoop);
-            
-            MessageFilter mf = 
mfSuccess.or(mfRestarting.or(mfCollision.or(mfInvalid.or(mfOverload.or(mfLoop)))));
-            
-            Message reply;
-            
-            try {
-                reply = source.node.usm.waitFor(mf);
-            } catch (DisconnectedException e1) {
-                lostConnection();
-            return;
-        }
-            
-            if(reply == null && !myParent.isConnected()) {
-                Logger.error(this, "Did not get notified of parent 
disconnection!");
-                lostConnection();
-                return;
-            }
-            
-            if(reply == null || reply.getSpec() == DMT.FNPRejectedOverload) {
-                // Timeout
-                Logger.error(this, "Timeout - "+myParent+" did not respond in 
"+TIMEOUT+"ms forwarding PublishData");
-                try {
-                    source.sendAsync(DMT.createFNPRejectedOverload(uid), null);
-                } catch (NotConnectedException e1) {
-                    Logger.normal(this, "Disconnected while sending rejected 
due to overload: "+
-                            source+" ("+key+","+packetNumber+")");
-                }
-                return;
-            }
-            
-            if(reply.getSpec() == DMT.FNPRejectedLoop) {
-        synchronized(this) {
-                    if(myParent != parent) continue;
-                }
-                Logger.error(this, "Loop rejection in forwarding PublishData 
to parent!");
-                // Tree topology is completely broken
-                forceSubscribe();
-                try {
-                    
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
-                } catch (NotConnectedException e) {
-                    Logger.normal(this, "Disconnected while sending rejection 
due to restart due to loop: "+
-                            source+" ("+key+","+packetNumber+")");
-                }
-                return;
-        }
-        
-            if(reply.getSpec() == DMT.FNPPublishDataInvalid) {
-                // FIXME check it
-                try {
-                    source.sendAsync(DMT.createFNPPublishDataInvalid(uid), 
null);
-                } catch (NotConnectedException e) {
-                    Logger.normal(this, "Disconnected while sending invalid 
data rejection: "+
-                            source+" ("+key+","+packetNumber+")");
-                }
-                return;
-    }
+                               Logger.minor(this, "Running " + this);
+                               PeerNode myParent;
+                               while (true) {
+                                       synchronized (this) {
+                                               myParent = parent;
+                                               if (restarting)
+                                                       myParent = null;
+                                               if (upstreamRestarting)
+                                                       myParent = null;
+                                       }
+                                       if (myParent == null) {
+                                               // We just became root, or we 
restarted
+                                               handlePublishData(uid, source, 
origMessage);
+                                               return;
+                                       }
+                                       // Send upwards
+                                       Message msg = 
DMT.createFNPPublishData(contents, key,
+                                                       packetNumber, uid);
+                                       try {
+                                               myParent.sendAsync(msg, null);
+                                       } catch (NotConnectedException e) {
+                                               lostConnection();
+                                               return;
+                                       }
+                                       // Wait for reply
+                                       MessageFilter mfSuccess = 
MessageFilter.create()
+                                                       
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+                                                       
.setSource(myParent).setMatchesDroppedConnection(
+                                                                       
true).setType(DMT.FNPPublishDataSucceeded);
+                                       MessageFilter mfRestarting = 
MessageFilter.create()
+                                                       
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+                                                       
.setSource(myParent).setMatchesDroppedConnection(
+                                                                       
true).setType(
+                                                                       
DMT.FNPPublishDataRejectedRestarting);
+                                       MessageFilter mfCollision = 
MessageFilter.create()
+                                                       
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+                                                       
.setSource(myParent).setMatchesDroppedConnection(
+                                                                       
true).setType(DMT.FNPPublishDataCollision);
+                                       MessageFilter mfInvalid = 
MessageFilter.create()
+                                                       
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+                                                       
.setSource(myParent).setMatchesDroppedConnection(
+                                                                       
true).setType(DMT.FNPPublishDataInvalid);
+                                       MessageFilter mfOverload = 
MessageFilter.create()
+                                                       
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+                                                       
.setSource(myParent).setMatchesDroppedConnection(
+                                                                       
true).setType(DMT.FNPRejectedOverload);
+                                       MessageFilter mfLoop = 
MessageFilter.create().setTimeout(
+                                                       
TIMEOUT).setField(DMT.UID, uid).setSource(myParent)
+                                                       
.setMatchesDroppedConnection(true).setType(
+                                                                       
DMT.FNPRejectedLoop);
 
-            if(reply.getSpec() == DMT.FNPPublishDataRejectedRestarting) {
-                // FIXME enforcement - has it recently said it's restarting?
-                try {
-                    
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
-                } catch (NotConnectedException e) {
-                    Logger.normal(this, "Disconnected while relaying 
rejected:restarting: "+
-                            source+" ("+key+","+packetNumber+")");
-                }
-                return;
-            }
+                                       MessageFilter mf = 
mfSuccess.or(mfRestarting.or(mfCollision
+                                                       
.or(mfInvalid.or(mfOverload.or(mfLoop)))));
 
-            if(reply.getSpec() == DMT.FNPPublishDataCollision) {
-                // Yay finally a nontrivial failure mode!
-                long suggested = reply.getLong(DMT.SUGGESTED_SEQNO);
-                suggested = Math.max(suggested, packets.getLastNumber()+1);
-                try {
-                    source.sendAsync(DMT.createFNPPublishDataCollision(uid, 
suggested), null);
-                } catch (NotConnectedException e) {
-                    Logger.normal(this, "Not connected while sending 
collision: "+
-                            source+" ("+key+","+packetNumber+")");
-                }
-                return;
-            }
-            
-            if(reply.getSpec() == DMT.FNPPublishDataSucceeded) {
-                // Success!
-                PacketItem item = new PacketItem(packetNumber, contents);
-                packets.add(item);
-                try {
-                    source.sendAsync(DMT.createFNPPublishDataSucceeded(uid), 
null);
-                } catch (NotConnectedException e) {
-                    Logger.normal(this, "Disconnected while adding new packet: 
"+
-                            source+" ("+key+","+packetNumber+")");
-                }
-                broadcast(item, source);
-                return;
-            }
-            
-            Logger.error(this, "Unrecognized packet: "+reply);
-            return;
-            }
-            } catch (Throwable t) {
-                Logger.error(this, "Caught "+t, t);
-            } finally {
-                source.node.unlockUID(uid);
-                source.node.completed(uid);
-            }
+                                       Message reply;
+
+                                       try {
+                                               reply = 
source.node.usm.waitFor(mf);
+                                       } catch (DisconnectedException e1) {
+                                               lostConnection();
+                                               return;
+                                       }
+
+                                       if (reply == null && 
!myParent.isConnected()) {
+                                               Logger
+                                                               .error(this,
+                                                                               
"Did not get notified of parent disconnection!");
+                                               lostConnection();
+                                               return;
+                                       }
+
+                                       if (reply == null
+                                                       || reply.getSpec() == 
DMT.FNPRejectedOverload) {
+                                               // Timeout
+                                               Logger.error(this, "Timeout - " 
+ myParent
+                                                               + " did not 
respond in " + TIMEOUT
+                                                               + "ms 
forwarding PublishData");
+                                               try {
+                                                       source.sendAsync(
+                                                                       
DMT.createFNPRejectedOverload(uid), null);
+                                               } catch (NotConnectedException 
e1) {
+                                                       Logger.normal(this,
+                                                                       
"Disconnected while sending rejected due to overload: "
+                                                                               
        + source + " (" + key + ","
+                                                                               
        + packetNumber + ")");
+                                               }
+                                               return;
+                                       }
+
+                                       if (reply.getSpec() == 
DMT.FNPRejectedLoop) {
+                                               synchronized (this) {
+                                                       if (myParent != parent)
+                                                               continue;
+                                               }
+                                               Logger.error(this,
+                                                                               
"Loop rejection in forwarding PublishData to parent!");
+                                               // Tree topology is completely 
broken
+                                               forceSubscribe();
+                                               try {
+                                                       
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid),
+                                                                               
        null);
+                                               } catch (NotConnectedException 
e) {
+                                                       Logger.normal(this,
+                                                                       
"Disconnected while sending rejection due to restart due to loop: "
+                                                                               
        + source + " (" + key + ","
+                                                                               
        + packetNumber + ")");
+                                               }
+                                               return;
+                                       }
+
+                                       if (reply.getSpec() == 
DMT.FNPPublishDataInvalid) {
+                                               // FIXME check it
+                                               try {
+                                                       source.sendAsync(DMT
+                                                                       
.createFNPPublishDataInvalid(uid), null);
+                                               } catch (NotConnectedException 
e) {
+                                                       Logger.normal(this,
+                                                                       
"Disconnected while sending invalid data rejection: "
+                                                                               
        + source + " (" + key + ","
+                                                                               
        + packetNumber + ")");
+                                               }
+                                               return;
+                                       }
+
+                                       if (reply.getSpec() == 
DMT.FNPPublishDataRejectedRestarting) {
+                                               // FIXME enforcement - has it 
recently said it's
+                                               // restarting?
+                                               try {
+                                                       source
+                                                                       
.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid),
+                                                                               
        null);
+                                               } catch (NotConnectedException 
e) {
+                                                       Logger.normal(this,
+                                                                       
"Disconnected while relaying rejected:restarting: "
+                                                                               
        + source + " (" + key + ","
+                                                                               
        + packetNumber + ")");
+                                               }
+                                               return;
+                                       }
+
+                                       if (reply.getSpec() == 
DMT.FNPPublishDataCollision) {
+                                               // Yay finally a nontrivial 
failure mode!
+                                               long suggested = 
reply.getLong(DMT.SUGGESTED_SEQNO);
+                                               suggested = Math.max(suggested,
+                                                               
packets.getLastNumber() + 1);
+                                               try {
+                                                       
source.sendAsync(DMT.createFNPPublishDataCollision(
+                                                                       uid, 
suggested), null);
+                                               } catch (NotConnectedException 
e) {
+                                                       Logger.normal(this,
+                                                                       "Not 
connected while sending collision: "
+                                                                               
        + source + " (" + key + ","
+                                                                               
        + packetNumber + ")");
+                                               }
+                                               return;
+                                       }
+
+                                       if (reply.getSpec() == 
DMT.FNPPublishDataSucceeded) {
+                                               // Success!
+                                               PacketItem item = new 
PacketItem(packetNumber, contents);
+                                               packets.add(item);
+                                               try {
+                                                       source.sendAsync(DMT
+                                                                       
.createFNPPublishDataSucceeded(uid), null);
+                                               } catch (NotConnectedException 
e) {
+                                                       Logger.normal(this,
+                                                                       
"Disconnected while adding new packet: "
+                                                                               
        + source + " (" + key + ","
+                                                                               
        + packetNumber + ")");
+                                               }
+                                               broadcast(item, source);
+                                               return;
+                                       }
+
+                                       Logger.error(this, "Unrecognized 
packet: " + reply);
+                                       return;
+                               }
+                       } catch (Throwable t) {
+                               Logger.error(this, "Caught " + t, t);
+                       } finally {
+                               source.node.unlockUID(uid);
+                               source.node.completed(uid);
+                       }
         }
 
         private void lostConnection() {
@@ -431,14 +484,16 @@
         
         }
 
-        /**
-     * Handle a SubscribeRequest. Do not route it. Add the
-     * node to the list of subscriber nodes and send it our status.
-     * Caller should have checked ID for loops.
-     * @param m The message to reply to.
-     * @throws DroppingSubscriptionHandlerException If the node is currently 
being dropped
-     * from the parent.
-         */
+    /**
+     * Handle a SubscribeRequest. Do not route it. Add the node to the list
+     * of subscriber nodes and send it our status. Caller should have
+     * checked ID for loops.
+     * 
+     * @param m
+     *            The message to reply to.
+     * @throws DroppingSubscriptionHandlerException
+     *             If the node is currently being dropped from the parent.
+     */
     public void handleSubscribeRequest(Message m) throws 
DroppingSubscriptionHandlerException {
         long id = m.getLong(DMT.UID);
         short htl = m.getShort(DMT.HTL);
@@ -451,7 +506,6 @@
         double origNearest = m.getDouble(DMT.NEAREST_LOCATION);
         double nearest = origNearest;
         double myLoc = source.node.lm.getLocation().getValue();
-        double target = key.toNormalizedDouble();
         if(Math.abs(myLoc-target) < Math.abs(nearest-target)) {
             htl = Node.MAX_HTL;
             nearest = myLoc;
@@ -460,6 +514,7 @@
         }
             
         boolean nowRestarting = false;
+        boolean nowRejectingOrigCloserThanRoot = false;
         SubscribeHandler handler;
         try {
             synchronized(this) {
@@ -475,6 +530,14 @@
                 // Will tell client node current status
                 handler = new SubscribeHandler(node, id, htl, source, 
orderedMessagesSeq, origNearest, nearest, this, items);
                 
+                if(subscribed) {
+                       if(Math.abs(target - rootLocation) > Math.abs(target - 
origNearest)) {
+                               Logger.normal(this, "Rejecting subscription: 
our root "+rootLocation+" is further from target "+target+
+                                               " than their nearest-so-far: 
"+origNearest);
+                               nowRejectingOrigCloserThanRoot = true;
+                       }
+                }
+                
                 if((!subscribed) && (!restarting)) {
                     restarting = true;
                     nowRestarting = true;
@@ -490,6 +553,19 @@
             }
         }
         
+        if(nowRejectingOrigCloserThanRoot) {
+               // htl 0 because of root
+               Message msg = DMT.createFNPRouteNotFound(id, (short)0);
+               try {
+                               source.sendAsync(msg, null);
+                       } catch (NotConnectedException e) {
+                               // Will not happen often!
+                               Logger.minor(this, "Rejected because orig 
closer than root, but conn closed on "+id+" for "+this);
+                       }
+                       // There *may* be a more optimal route somewhere...
+                       maybeRestart();
+        }
+        
         try {
             if(items != null && items.length > 0) {
                 for(int i=0;i<items.length;i++) {
@@ -642,19 +718,9 @@
         }
     }
 
-    /**
-     * Callback from SubscribeSender, indicating a change in status.
-     * @param sender2
-     * @param status
-     */
-    void statusChange(SubscribeSender sender2, int status) {
-            // TODO Auto-generated method stub
-            
-        }
-
     synchronized long getRestartUID() {
         return restartUID;
-        }
+    }
 
     public synchronized double targetLocation() {
         return key.toNormalizedDouble();
@@ -689,4 +755,37 @@
     public long getLastSeqNum() {
         return packets.getLastContiguousSeqNum();
     }
+
+    /**
+     * A SubscribeRequest succeeded.
+     * @param next The node we are now subscribed through.
+     * @param rootLoc The tree root's location.
+     */
+       public void subscribeSucceeded(PeerNode next, double rootLoc) {
+               SubscribeHandler[] handlers;
+               synchronized(this) {
+                       parent = next;
+                       subscribed = true;
+                       rootLocation = rootLoc;
+                       handlers = subscriberPeerHandlers;
+                       for(int i=0;i<handlers.length;i++) {
+                               SubscribeHandler handler = handlers[i];
+                               if(Math.abs(handler.origNearestLoc-target) < 
Math.abs(rootLoc-target)) {
+                               // htl 0 because of root
+                               Message msg = 
DMT.createFNPRouteNotFound(handler.origID, (short)0);
+                               try {
+                                               
handler.subscriber.sendAsync(msg, null);
+                                       } catch (NotConnectedException e) {
+                                               // Will not happen often!
+                                               Logger.minor(this, "Rejected 
because orig closer than root, but conn closed on "+handler.origID+" for 
"+this);
+                                       }
+                                       removeSubscriberHandler(handler);
+                                       continue;
+                               } else {
+                                       handler.sendStatusMessage(true, 
rootLoc);
+                                       // Stays on list because is subscribed
+                               }
+                       }
+               }
+       }
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to