Author: toad
Date: 2005-10-18 19:54:33 +0000 (Tue, 18 Oct 2005)
New Revision: 7438

Modified:
   branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
   
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
   
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
   
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
Log:
Lots of changes. Just about compiles, except for resubscribe support.

Modified: branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java     
2005-10-18 14:14:07 UTC (rev 7437)
+++ branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java     
2005-10-18 19:54:33 UTC (rev 7438)
@@ -934,6 +934,18 @@
         return msg;
     }
 
+    public static final MessageType FNPUnsubscribe = new 
MessageType("FNPUnsubscribe") {{
+       addField(UID, Long.class);
+    }};
+    
+
+       public static Message createFNPUnsubscribe(long id, int counter) {
+               Message msg = new Message(FNPUnsubscribe);
+               msg.set(UID, id);
+               msg.set(ORDERED_MESSAGE_NUM, counter);
+               return msg;
+       }
+    
     public static final MessageType FNPPublishDataCollision = new 
MessageType("FNPPublishDataCollision") {{
         addField(UID, Long.class);
         addField(SUGGESTED_SEQNO, Long.class);
@@ -963,5 +975,4 @@
 //    }
 //    
        public static void init() { }
-
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
  2005-10-18 14:14:07 UTC (rev 7437)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
  2005-10-18 19:54:33 UTC (rev 7438)
@@ -56,4 +56,25 @@
             impl.processPacket(packetNumber, finalData);
         }
     }
+
+       public synchronized void notifyConnected() {
+               for(Iterator i=clientSubs.iterator();i.hasNext();) {
+                       ClientSubscriptionImpl sub = (ClientSubscriptionImpl) 
i.next();
+                       sub.notifyConnected();
+               }
+       }
+
+       public synchronized void forceDisconnect() {
+               for(Iterator i=clientSubs.iterator();i.hasNext();) {
+                       ClientSubscriptionImpl sub = (ClientSubscriptionImpl) 
i.next();
+                       sub.forceDisconnect();
+               }
+       }
+
+       public synchronized void notifyDisconnected() {
+               for(Iterator i=clientSubs.iterator();i.hasNext();) {
+                       ClientSubscriptionImpl sub = (ClientSubscriptionImpl) 
i.next();
+                       sub.notifyDisconnected();
+               }
+       }
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java 
    2005-10-18 14:14:07 UTC (rev 7437)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java 
    2005-10-18 19:54:33 UTC (rev 7438)
@@ -46,4 +46,28 @@
         }
     }
 
+       public void notifyConnected() {
+               try {
+                       cb.connected();
+               } catch (Throwable t) {
+                       Logger.error(this, "Caught "+t+" from callback in 
notifyConnected on "+this, t);
+               }
+       }
+
+       public void forceDisconnect() {
+               try {
+                       cb.lostConnectionFatal();
+               } catch (Throwable t) {
+                       Logger.error(this, "Caught "+t+" from callback in 
forceDisconnect on "+this, t);
+               }
+               disconnect();
+       }
+
+       public void notifyDisconnected() {
+               try {
+                       cb.lostConnection();
+               } catch (Throwable t) {
+                       Logger.error(this, "Caught "+t+" from callback in 
notifyDisconnected on "+this, t);
+               }
+       }
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-18 14:14:07 UTC (rev 7437)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-18 19:54:33 UTC (rev 7438)
@@ -20,6 +20,7 @@
     private short htl;
     final PeerNode source;
     final SubscriptionHandler handler;
+    final SubscribeHandler origSubscriber;
     final double nearestSoFar;
     private final HashSet nodesRoutedTo;
     private PeerNode next;
@@ -56,8 +57,10 @@
      * @param source The source node. Null if this is locally originated.
      * @param handler The SubscriptionHandler which created us, and who want 
to know 
      * when our status changes.
+     * @param sub The SubscribeHandler which originated this SubscribeSender, 
if any.
+     * Null if this is a locally originated subscription.
      */
-    public SubscribeSender(long id, short htl, PeerNode source, double 
nearestSoFar, SubscriptionHandler handler) {
+    public SubscribeSender(long id, short htl, PeerNode source, double 
nearestSoFar, SubscriptionHandler handler, SubscribeHandler sub) {
         this.id = id;
         this.htl = htl;
         this.source = source;
@@ -67,6 +70,7 @@
         node = handler.node;
         status = SEARCHING;
         nodesRoutedTo = new HashSet();
+        origSubscriber = sub;
     }
 
     public void run() {
@@ -146,13 +150,18 @@
                 if(rootLoc < 0.0 || rootLoc > 1.0) {
                     Logger.error(this, "Invalid root loc: "+rootLoc+" from 
"+next);
                     msg = DMT.createFNPUnsubscribe(id, counter);
-                    next.sendAsync(msg, null);
-                    continue;
+                    try {
+                                               next.sendAsync(msg, null);
+                                       } catch (NotConnectedException e) {
+                                               Logger.minor(this, "Not 
connected sending unsub (invalid root loc "+rootLoc+") to "+next);
+                                       }
+                                       handler.invalidSuccessInRestarting();
+                    setStatus(SEARCHING);
                 }
                 // Validated success
-                handler.subscribeSucceeded(next, rootLoc);
+                handler.subscribeSucceeded(next, rootLoc, 
(short)(origSubscriber.origHTL - htl));
                 setStatus(SUCCESS);
-                break;
+                return;
             }
             
         } catch (DisconnectedException e) {
@@ -272,7 +281,7 @@
             MessageFilter mfSubscribeRestarted = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
             counter++;
             
-            MessageFilter mf = 
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted)));
+            MessageFilter mf = 
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted.or(mfRouteNotFound))));
             
             try {
                 msg = node.usm.waitFor(mf);
@@ -302,11 +311,15 @@
                 if(rootLoc < 0.0 || rootLoc > 1.0) {
                     Logger.error(this, "Invalid root loc: "+rootLoc+" from 
"+next);
                     msg = DMT.createFNPUnsubscribe(id, counter);
-                    next.sendAsync(msg, null);
+                    try {
+                                               next.sendAsync(msg, null);
+                                       } catch (NotConnectedException e) {
+                                               Logger.error(this, 
"NotConnectedException sending "+msg+" to "+next);
+                                       }
                     continue;
                 }
                 // Validated success
-                handler.subscribeSucceeded(next, rootLoc);
+                handler.subscribeSucceeded(next, rootLoc, 
(short)(origSubscriber.origHTL - htl));
                 setStatus(SUCCESS);
                 break;
             }
@@ -338,5 +351,9 @@
        Logger.debug(this, "Setting status to "+newStatus+" on "+this);
         status = newStatus;
     }
+
+       public short getHTL() {
+               return htl;
+       }
     
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java   
    2005-10-18 14:14:07 UTC (rev 7437)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java   
    2005-10-18 19:54:33 UTC (rev 7438)
@@ -5,12 +5,19 @@
  */
 public interface SubscriptionCallback {
 
+       /** Received a packet */
     void got(long packetNumber, byte[] data);
     
+    /** Lost the subscription */
     void lostConnection();
     
+    /** Permanently lost the connection */
+    void lostConnectionFatal();
+    
+    /** Restarted the subscription */
     void restarted();
     
+    /** Connected to the stream */
     void connected();
     
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-18 14:14:07 UTC (rev 7437)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java    
    2005-10-18 19:54:33 UTC (rev 7438)
@@ -527,7 +527,7 @@
                 // Collect the old packets to send when accepted
                 items = packets.getAfter(lastSeen);
                 
-                // Will tell client node current status
+                // Will tell client node current status, and add self to us
                 handler = new SubscribeHandler(node, id, htl, source, 
orderedMessagesSeq, origNearest, nearest, this, items);
                 
                 if(subscribed) {
@@ -579,7 +579,7 @@
         }
     }
 
-    /**
+       /**
      * Is the node already subscribed?
      */
     private synchronized boolean isSubscribed(PeerNode source) {
@@ -594,14 +594,14 @@
         short htl = handler.origHTL;
         PeerNode source = handler.subscriber;
         double nearest = handler.origNearestLoc;
-        startSubscribe(id, htl, nearest, source);
+        startSubscribe(id, htl, nearest, source, handler);
     }
 
-    private void startSubscribe(long id, short htl, double nearest, PeerNode 
source) {
+    private void startSubscribe(long id, short htl, double nearest, PeerNode 
source, SubscribeHandler sub) {
         synchronized(this) {
             if(subscribed) return;
             if(sender != null) return;
-            sender = new SubscribeSender(id, htl, source, nearest, this);
+            sender = new SubscribeSender(id, htl, source, nearest, this, sub);
         }
         Thread t = new Thread(sender);
         t.setDaemon(true);
@@ -711,7 +711,7 @@
                     restarting = true;
                 }
             }
-            startSubscribe(random.nextLong(), Node.MAX_HTL, 
node.lm.getLocation().getValue(), null);
+            startSubscribe(random.nextLong(), Node.MAX_HTL, 
node.lm.getLocation().getValue(), null, null);
         } finally {
             if(restartingNow && sender == null)
                 restarting = false;
@@ -719,7 +719,10 @@
     }
 
     synchronized long getRestartUID() {
-        return restartUID;
+       if(upstreamRestarting)
+               return upstreamRestartUID;
+       else
+               return restartUID;
     }
 
     public synchronized double targetLocation() {
@@ -738,6 +741,8 @@
      * returns our location.
      */
     public synchronized double rootLoc() {
+       if(isRoot())
+               return node.lm.getLocation().getValue();
         return rootLocation;
     }
 
@@ -761,31 +766,214 @@
      * @param next The node we are now subscribed through.
      * @param rootLoc The tree root's location.
      */
-       public void subscribeSucceeded(PeerNode next, double rootLoc) {
-               SubscribeHandler[] handlers;
+       public void subscribeSucceeded(PeerNode next, double rootLoc, short 
htlDiff) {
+               ClientSubscriptionHandler[] clients;
                synchronized(this) {
+                       SubscribeHandler[] handlers;
                        parent = next;
                        subscribed = true;
+                       // Do not null sender, as still using it; it persists 
for as long as the subscription does
+                       restarting = false;
                        rootLocation = rootLoc;
                        handlers = subscriberPeerHandlers;
-                       for(int i=0;i<handlers.length;i++) {
-                               SubscribeHandler handler = handlers[i];
-                               if(Math.abs(handler.origNearestLoc-target) < 
Math.abs(rootLoc-target)) {
-                               // htl 0 because of root
-                               Message msg = 
DMT.createFNPRouteNotFound(handler.origID, (short)0);
-                               try {
-                                               
handler.subscriber.sendAsync(msg, null);
+                       if(handlers != null) {
+                               for(int i=0;i<handlers.length;i++) {
+                                       SubscribeHandler handler = handlers[i];
+                                       
if(Math.abs(handler.origNearestLoc-target) <= Math.abs(rootLoc-target)) {
+                                               // If ==, better to go to the 
old one.
+                                               // htl 0 because of root
+                                               Message msg = 
DMT.createFNPRouteNotFound(handler.origID, (short)Math.max(0, 
handler.origHTL-htlDiff));
+                                               try {
+                                                       
handler.subscriber.sendAsync(msg, null);
+                                               } catch (NotConnectedException 
e) {
+                                                       // Will not happen 
often!
+                                                       Logger.minor(this, 
"Rejected because orig closer than root, but conn closed on "+handler.origID+" 
for "+this);
+                                               }
+                                               
removeSubscriberHandler(handler);
+                                               continue;
+                                       } else {
+                                               handler.sendStatusMessage(true, 
rootLoc);
+                                               // Stays on list because is 
subscribed
+                                       }
+                               }
+                       }
+                       clients = localSubscribers;
+               }
+               for(int i=0;i<clients.length;i++) {
+                       clients[i].notifyConnected();
+               }
+       }
+
+       /** Our SubscribeSender could not find the stream it was trying to
+        * subscribe to. If we are closer to the target than the current
+        * SubscribeHandler's best-so-far, we become the root node. Otherwise
+        * we send it back an RNF, and possibly restart the subscription
+        * attempt with the next SubscribeHandler.
+        * @throws DroppingSubscriptionHandlerException 
+        */
+       void senderRNF() {
+               Logger.minor(this, "Sender could not find stream on "+this);
+               boolean becomeRoot = false;
+               short htlDiff = 0;
+               synchronized(this) {
+                       SubscribeHandler origSub = sender.origSubscriber;
+                       if(origSub == null) {
+                               sender = null;
+                               becomeRoot = true;
+                               htlDiff = 0;
+                       } else {
+                               double prevBest = origSub.origNearestLoc;
+                               double me = node.lm.getLocation().getValue();
+                               if(Math.abs(prevBest-target) > 
Math.abs(me-target)) {
+                                       // Succeed
+                                       sender = null;
+                                       becomeRoot = true;
+                                       htlDiff = (short) (sender.getHTL() - 
origSub.origHTL);
+                               } else {
+                                       // Fail: Send an RNF to the origSub.
+                                       Message rnf = 
DMT.createFNPRouteNotFound(origSub.origID, sender.getHTL());
+                                       try {
+                                               
origSub.subscriber.sendAsync(rnf, null);
                                        } catch (NotConnectedException e) {
-                                               // Will not happen often!
-                                               Logger.minor(this, "Rejected 
because orig closer than root, but conn closed on "+handler.origID+" for 
"+this);
+                                               // Doh
+                                               Logger.normal(this, "Lost 
connection to subscriber while telling it it failed: "+origSub+" on "+this);
                                        }
-                                       removeSubscriberHandler(handler);
-                                       continue;
-                               } else {
-                                       handler.sendStatusMessage(true, 
rootLoc);
-                                       // Stays on list because is subscribed
+                                       // Now, is there another one to try?
+                                       removeSubscriberHandler(origSub);
+                                       maybeRestartSenderOnError();
                                }
                        }
                }
+               if(becomeRoot)
+                       becomeRoot(htlDiff);
+               if(dropping)
+                       manager.drop(this, key);
        }
+
+       /**
+        * Restart the sender if possible, after an error, unless shouldDrop(),
+        * in which case set dropping.
+        */
+       private synchronized void maybeRestartSenderOnError() {
+               if(!shouldDrop()) {
+                       sender = null;
+                       if(subscriberPeerHandlers != null && 
subscriberPeerHandlers.length > 0) {
+                               SubscribeHandler handler = 
subscriberPeerHandlers[0];
+                               startSubscribe(handler);
+                       } else {
+                               // Local subscriptions
+                               try {
+                                       forceSubscribe();
+                               } catch (DroppingSubscriptionHandlerException 
e) {
+                                       // Hrrrrrmm...
+                                       if(shouldDrop()) {
+                                               Logger.normal(this, "Got "+e);
+                                       } else {
+                                               Logger.error(this, "Got: "+e+" 
but !shouldDrop() !!");
+                                       }
+                               }
+                               // If that fails, we become root
+                       }
+               } else {
+                       sender = null;
+                       subscribed = false;
+                       restarting = false;
+                       dropping = true;
+               }
+       }
+
+       /** Become root */
+       synchronized void becomeRoot(short htlDiff) {
+               subscribeSucceeded(null, node.lm.getLocation().getValue(), 
htlDiff);
+       }
+
+       /** Sender received RejectedOverload.
+        * Forward it to the original requestor, and run another one if 
possible.
+        * If this was a local request, tell all clients they are disconnected.
+        */
+       void senderRejectedOverload() {
+               ClientSubscriptionHandler[] clients;
+               synchronized(this) {
+                       SubscribeHandler origSub = sender.origSubscriber;
+                       if(origSub != null) {
+                               Message rejected = 
DMT.createFNPRejectedOverload(origSub.origID);
+                               try {
+                                       origSub.subscriber.sendAsync(rejected, 
null);
+                               } catch (NotConnectedException e) {
+                                       Logger.minor(this, "Failed to send 
RejectedOverload to "+origSub.subscriber+" for "+this);
+                               }
+                               removeSubscriberHandler(origSub);
+                               // Exactly the same as above - either restart 
or drop 
+                               maybeRestartSenderOnError();
+                               return;
+                       }
+                       clients = localSubscribers;
+                       localSubscribers = null;
+               }
+               // If still here, was local-only, so we cancel the local 
requests.
+               for(int i=0;i<clients.length;i++) {
+                       clients[i].forceDisconnect();
+               }
+       }
+
+       /** Upstream restarted */
+       void subscribeRestarted(PeerNode next, long restartID) {
+               SubscribeHandler[] subscribers;
+               synchronized(this) {
+                       parent = next;
+                       upstreamRestarting = true;
+                       upstreamRestartUID = restartID;
+                       subscribers = subscriberPeerHandlers;
+               }
+               for(int i=0;i<subscribers.length;i++)
+                       subscribers[i].sendStatusMessage(false, rootLoc());
+       }
+
+       /** Generic method for upstream restart failed */
+       private void upstreamRestartFailed() {
+               SubscribeHandler[] subscribers;
+               synchronized(this) {
+                       // Kill upstream restart
+                       upstreamRestarting = false;
+                       subscribers = subscriberPeerHandlers;
+               }
+               for(int i=0;i<subscribers.length;i++)
+                       subscribers[i].sendStatusMessage(false, rootLoc());
+       }
+       
+       /**
+        * Got an RNF while upstream was restarting.
+        * Just send our old restart ID out.
+        */
+       void senderRestartingSearchingRNF() {
+               upstreamRestartFailed();
+       }
+       
+       /** Lost connection to node which was restarting. */
+       void lostParentConnectionWasRestarting() {
+               upstreamRestartFailed();
+       }
+
+       /** Restarting would-be parent sent an invalid success message */
+       void invalidSuccessInRestarting() {
+               upstreamRestartFailed();
+       }
+
+       /** Had a connection, lost it */
+       public void lostParentConnectionWasSuccessful() {
+               SubscribeHandler[] handlers;
+               ClientSubscriptionHandler[] clients;
+               synchronized(this) {
+                       restarting = true;
+                       subscribed = false;
+                       rootLocation = -1.0;
+                       parent = null;
+                       handlers = subscriberPeerHandlers;
+                       clients = localSubscribers;
+               }
+               for(int i=0;i<handlers.length;i++)
+                       handlers[i].sendStatusMessage(false, -1.0);
+               for(int i=0;i<clients.length;i++)
+                       clients[i].notifyDisconnected();
+       }
 }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java    
    2005-10-18 14:14:07 UTC (rev 7437)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java    
    2005-10-18 19:54:33 UTC (rev 7438)
@@ -97,7 +97,7 @@
     synchronized void drop(SubscriptionHandler sub, PublishStreamKey key) {
         SubscriptionHandler oldSub = 
             (SubscriptionHandler) subscriptionsByKey.remove(key);
-        if(oldSub != sub) {
+        if(oldSub != sub && oldSub != null) {
             Logger.error(this, "Dropping "+sub+" but remove returned "+oldSub);
         }
     }

Modified: 
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
    2005-10-18 14:14:07 UTC (rev 7437)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
    2005-10-18 19:54:33 UTC (rev 7438)
@@ -513,48 +513,35 @@
         }
 
         public void got(long packetNumber, byte[] data) {
-            try {
-                subscribedDataStream.write(name+":"+packetNumber+":"+new 
String(data)+"\n");
-                subscribedDataStream.flush();
-            } catch (IOException e) {
-                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
-                Logger.error(this, s);
-                System.err.println(s);
-            }
+               write(name+":"+packetNumber+":"+new String(data)+"\n");
         }
 
-        public void lostConnection() {
+        private void write(String string) {
             try {
-                subscribedDataStream.write(name+":LOST CONNECTION\n");
+                subscribedDataStream.write(string);
                 subscribedDataStream.flush();
             } catch (IOException e) {
                 String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
                 Logger.error(this, s);
                 System.err.println(s);
             }
+               }
+
+               public void lostConnection() {
+                       write(name+":LOST CONNECTION\n");
         }
 
         public void restarted() {
-            try {
-                subscribedDataStream.write(name+":RESTARTED\n");
-                subscribedDataStream.flush();
-            } catch (IOException e) {
-                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
-                Logger.error(this, s);
-                System.err.println(s);
-            }
+               write(name+":RESTARTED\n");
         }
 
         public void connected() {
-            try {
-                subscribedDataStream.write(name+":CONNECTED\n");
-                subscribedDataStream.flush();
-            } catch (IOException e) {
-                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
-                Logger.error(this, s);
-                System.err.println(s);
-            }
+               write(name+":CONNECTED\n");
         }
+
+               public void lostConnectionFatal() {
+                       write(name+":PERMANENTLY CONNECTION\n");
+               }
     }
 
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to