Author: toad
Date: 2005-10-13 14:27:51 +0000 (Thu, 13 Oct 2005)
New Revision: 7429

Added:
   
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
   
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
   branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
Log:
Missed files.

Added: 
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
       2005-10-12 21:55:08 UTC (rev 7428)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
       2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,10 @@
+package freenet.node;
+
+/**
+ * Thrown when we try to do something to a SubscriptionHandler
+ * while it is being removed from the SubscriptionManager. This
+ * is to avoid messy, potentially deadlocking nested locks.
+ */
+public class DroppingSubscriptionHandlerException extends Exception {
+
+}

Added: 
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
===================================================================
--- 
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
  2005-10-12 21:55:08 UTC (rev 7428)
+++ 
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
  2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,8 @@
+package freenet.node;
+
+/**
+ * A queued subscription request.
+ */
+public class QueuedSubscriptionRequest {
+
+}

Added: branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java   
2005-10-12 21:55:08 UTC (rev 7428)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java   
2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,118 @@
+package freenet.node;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
+import freenet.node.SubscriptionHandler.PacketItem;
+import freenet.support.Logger;
+import freenet.support.NumberedItem;
+
+/**
+ * Handler for a node subscribed through us.
+ * Handles the maintenance of the connection, *not* the relaying of data
+ * chunks.
+ * On creation, should tell the downstream node the current status ASAP.
+ * After that, should keep it up to date.
+ * 
+ * Also carries enough data to forward the request, if the request is
+ * queued due to a previous one being served at the time of receipt. If
+ * the previous one succeeds, we send success. If the previous one fails,
+ * we start.
+ */
+public class SubscribeHandler {
+
+    /** Our node */
+    final Node node;
+    /** Our original ID */
+    final long origID;
+    /** Our original HTL */
+    final short origHTL;
+    /** Our stream */
+    final SubscriptionHandler sub;
+    /** Our node */
+    final PeerNode subscriber;
+    /** Nearest location so far on incoming request */
+    final double origNearestLoc;
+    /** Updated nearest location so far */
+    private double nearestLoc;
+    /** Our next message number counter.
+     * Subscription status messages have to be ordered because of various
+     * race conditions.
+     */
+    private int counter;
+    
+    /**
+     * Should be called with parent lock held.
+     */
+    SubscribeHandler(Node node, long id, short htl, PeerNode source, 
+            int orderedMessagesSeq, double origNearestLoc, double 
newNearestLoc, SubscriptionHandler handler, NumberedItem[] items) {
+        this.node = node;
+        counter = orderedMessagesSeq;
+        subscriber = source;
+        origID = id;
+        origHTL = htl;
+        this.sub = handler;
+        this.origNearestLoc = origNearestLoc;
+        this.nearestLoc = nearestLoc;
+        boolean rejectRNF = false;
+        double rootLoc = sub.rootLoc();
+        double target = sub.targetLocation();
+        boolean subscribed = sub.isSubscribed();
+        if(subscribed) {
+            if(Math.abs(target-rootLoc) <= Math.abs(target-origNearestLoc)) {
+                // Root is closer to target than request was
+                // Just accept it
+            } else {
+                // Root is further away from target than request was!
+                rejectRNF = true;
+            }
+        } else {
+            // We are restarting, or we are about to restart
+            // Just accept it
+        }
+        if(rejectRNF) {
+            // FIXME: lock safety? Should do this off-thread?
+            Message msg = DMT.createFNPRouteNotFound(origID, 
node.decrementHTL(source, origHTL));
+            try {
+                source.sendAsync(msg, null);
+            } catch (NotConnectedException e) {
+                Logger.error(this, "Could not send RNF to "+source);
+            }
+        } else {
+            // Add self to parent
+            sub.addSubscriberHandler(this);
+            // FIXME: lock safety? Should do this off-thread?
+            try {
+                if(sendStatusMessage(subscribed, rootLoc)) {
+                    for(int i=0;i<items.length;i++) {
+                        PacketItem item = (PacketItem) items[i];
+                        item.forwardTo(source);
+                    }
+                }
+            } catch (NotConnectedException e) {
+                Logger.normal(this, "Disconnected from "+source);
+                sub.removeSubscriberHandler(this);
+            }
+        }
+    }
+
+    /**
+     * Send the current status to the source node.
+     * @return True unless we disconnected.
+     */
+    private boolean sendStatusMessage(boolean subscribed, double rootLoc) {
+        Message msg;
+        if(subscribed)
+            msg = DMT.createFNPSubscribeSucceeded(origID, counter++, rootLoc);
+        else
+            msg = DMT.createFNPSubscribeRestarted(origID, sub.getRestartUID(), 
counter++);
+        try {
+            subscriber.sendAsync(msg, null);
+            return true;
+        } catch (NotConnectedException e) {
+            Logger.error(this, "Not connected sending status message to 
"+subscriber);
+            sub.removeSubscriberHandler(this);
+            return false;
+        }
+    }
+}

Added: branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-12 21:55:08 UTC (rev 7428)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java    
2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,288 @@
+package freenet.node;
+
+import java.util.HashSet;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.support.Logger;
+
+/**
+ * SubscriptionRequest sender. Analogous to RequestSender.
+ */
+public class SubscribeSender implements Runnable {
+
+    final Node node;
+    final long id;
+    final double target;
+    private short htl;
+    final PeerNode source;
+    final SubscriptionHandler handler;
+    final double nearestSoFar;
+    private final HashSet nodesRoutedTo;
+    private PeerNode next;
+    private int counter;
+    
+    // Still running
+    static final int SEARCHING = -1;
+    // Found a closer node than the closest-so-far
+    static final int SUCCESS = 0;
+    // RNF => a) we are the closest node, we become root, or b) we fail and 
return RNF
+    static final int ROUTE_NOT_FOUND = 1;
+    // A node was overloaded
+    static final int REJECTED_OVERLOAD = 2;
+    // An internal error occurred
+    static final int INTERNAL_ERROR = 3;
+    // A node is restarting
+    static final int RESTARTING = 4;
+
+    static final int INITIAL_TIMEOUT = 5000;
+    static final int RESTART_TIMEOUT = 120*1000;
+    
+    private int status;
+    
+    int getStatus() {
+        return status;
+    }
+    
+    /**
+     * Constructor.
+     * Note that we do not start() in the constructor for locking reasons.
+     * @param id The message ID of the SubscribeRequest chain.
+     * @param htl The HTL currently remaining on the SubscribeRequest chain. 
Passed in
+     * "raw"; we get to reset it.
+     * @param source The source node. Null if this is locally originated.
+     * @param handler The SubscriptionHandler which created us, and who want 
to know 
+     * when our status changes.
+     */
+    public SubscribeSender(long id, short htl, PeerNode source, double 
nearestSoFar, SubscriptionHandler handler) {
+        this.id = id;
+        this.htl = htl;
+        this.source = source;
+        this.handler = handler;
+        this.nearestSoFar = nearestSoFar;
+        target = handler.targetLocation();
+        node = handler.node;
+        status = -1;
+        nodesRoutedTo = new HashSet();
+    }
+
+    public void run() {
+        try {
+            realRun();
+        } catch (Throwable t) {
+            Logger.error(this, "Caught "+t+" in SubscribeSender for "+id, t);
+            setStatus(INTERNAL_ERROR);
+        }
+    }
+
+    /**
+     * The real run() method - we keep the try/catch in a separate
+     * method so we don't forget to handle errors.
+     */
+    private void realRun() {
+        while(true) {
+            // Phase 1 (find the node)
+            if(status == SEARCHING)
+                runPhase1();
+
+            // Phase 2 - wait for change
+            if(status == SUCCESS)
+                runPhase2Succeeded();
+            else if(status == RESTARTING)
+                runPhase2Restarting();
+            else return;
+        }
+    }
+
+    /**
+     * A node sent us an FNPSubscribeRestarting.
+     * Wait for it to succeed, fail, or timeout.
+     */
+    private void runPhase2Restarting() {
+        /* What happens now??
+         * One of three things:
+         * - We get an error
+         * - We get a timeout
+         * - We get success
+         * 
+         * Success: FNPSubscribeSucceeded, FNPSubscribeSucceededNewRoot
+         * 
+         * Error: RejectedOverload, RouteNotFound
+         */
+        MessageFilter mfSubscribeSucceeded = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setField(DMT.ORDERED_MESSAGE_NUM, 
counter).setType(DMT.FNPSubscribeSucceeded);
+        MessageFilter mfSubscribeSucceededNewRoot = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setField(DMT.ORDERED_MESSAGE_NUM, 
counter).setType(DMT.FNPSubscribeSucceededNewRoot);
+        MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setType(DMT.FNPRejectedOverload);
+        MessageFilter mfRouteNotFound = 
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
 id).setType(DMT.FNPRouteNotFound);
+        
+        MessageFilter mf = 
mfSubscribeSucceeded.or(mfSubscribeSucceededNewRoot.or(mfRejectedOverload.or(mfRouteNotFound)));
+        counter++;
+        
+        try {
+            Message msg = node.usm.waitFor(mf);
+            
+            if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
+                // :(
+                // Timeout or rejected:overload
+                Logger.error(this, "Timeout or rejected:overload in 
restarting: "+msg+" for "+id+" to "+next);
+                setStatus(REJECTED_OVERLOAD);
+                return;
+            }
+            
+            if(msg.getSpec() == DMT.FNPRouteNotFound) {
+                
+            }
+            
+            // TODO Auto-generated method stub
+        } catch (DisconnectedException e) {
+            Logger.minor(this, "Lost connection in restarting");
+            // Lost him
+            handler.lostParentConnectionAlreadyRestarting();
+            setStatus(SEARCHING);
+            return;
+        }
+        
+    }
+
+    /**
+     * A node sent us an FNPSubscribeSucceeded.
+     * Wait for anything to change - supervises the subscription,
+     * essentially.
+     */
+    private void runPhase2Succeeded() {
+        /* What can change?
+         * We can get an FNPSubscribeRestarted, or
+         * we can lose the connection.
+         */
+        MessageFilter mf = 
MessageFilter.create().setType(DMT.FNPSubscribeRestarted).setField(DMT.UID,id).setSource(next).setTimeout(120*1000);
+        while(true) {
+            try {
+                Message msg = node.usm.waitFor(mf);
+                if(msg != null) {
+                    setStatus(RESTARTING);
+                    return;
+                } // else continue
+            } catch (DisconnectedException e) {
+                // Lost him
+                Logger.minor(this, "Lost connection in succeeded");
+                handler.lostParentConnectionAlreadyRestarting();
+                setStatus(SEARCHING);
+                return;
+            }
+        }
+    }
+
+    /**
+     * Find a node to subscribe to.
+     */
+    private void runPhase1() {
+        while(true) {
+            // Check HTL > 0
+            if(htl <= 0) {
+                setStatus(ROUTE_NOT_FOUND);
+                return;
+            }
+            
+            // Route
+            
+            synchronized(node.peers) {
+                next = node.peers.closerPeer(source, nodesRoutedTo, target, 
true);
+            }
+            
+            if(next == null) {
+                // Backtrack
+                setStatus(ROUTE_NOT_FOUND);
+                return;
+            }
+            Logger.minor(this, "Routing subscribe to "+next);
+            nodesRoutedTo.add(next);
+            
+            // Create message, including new counter
+            counter = node.random.nextInt();
+            Message msg = DMT.createFNPSubscribeRequest(id, htl, handler.key, 
handler.getLastSeqNum(), nearestSoFar, counter);            
+            
+            try {
+                // Send it
+                next.sendAsync(msg, null);
+            } catch (NotConnectedException e) {
+                Logger.normal(this, "Disconnected from "+next);
+                continue;
+            }
+            
+            // Wait for success/failure/etc
+            
+            /* What can happen?
+             * RejectedLoop can happen (ID collision)
+             * RejectedOverload can happen, maybe
+             * RouteNotFound ???? probably not
+             * SubscribeSucceeded can definitely happen (~= accepted)
+             * SubscribeRestarted can definitely happen (~= accepted)
+             * SubscribeSucceededNewRoot ??? probably not
+             * 
+             * Anything else?
+             */
+            
+            MessageFilter mfRejectedLoop = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
+            MessageFilter mfRejectedOverload = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
 counter);
+            // These two don't need a counter as they terminate our contact 
with the node
+            MessageFilter mfSubscribeSucceeded = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT);
+            MessageFilter mfSubscribeRestarted = 
MessageFilter.create().setField(DMT.UID, 
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT);
+            counter++;
+            
+            MessageFilter mf = 
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted)));
+            
+            try {
+                msg = node.usm.waitFor(mf);
+            } catch (DisconnectedException e1) {
+                Logger.normal(this, "Disconnected: "+next+" while waiting");
+                htl = node.decrementHTL(source, htl);
+                continue;
+            }
+            
+            if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
+                // Timeout or rejected:overload
+                Logger.error(this, "Timeout or rejected:overload: "+msg+" for 
"+id+" to "+next);
+                setStatus(REJECTED_OVERLOAD);
+                return;
+            }
+            
+            if(msg.getSpec() == DMT.FNPRejectedLoop) {
+                Logger.minor(this, "Loop; decrementing htl");
+                htl = node.decrementHTL(source, htl);
+                continue;
+            }
+            
+            if(msg.getSpec() == DMT.FNPSubscribeSucceeded) {
+                // Success!
+                double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
+                if(rootLoc < 0.0 || rootLoc > 1.0) {
+                    Logger.error(this, "Invalid root loc: "+rootLoc+" from 
"+next);
+                    msg = DMT.createFNPUnsubscribe(id, counter);
+                    next.sendAsync(msg, null);
+                    continue;
+                }
+                // Validated success
+                handler.subscribeSucceeded(next, rootLoc);
+                setStatus(SUCCESS);
+                break;
+            }
+            
+            if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
+                long restartID = msg.getLong(DMT.RESTART_UID);
+                handler.subscribeRestarted(next, restartID);
+                setStatus(RESTARTING);
+                break;
+            }
+            
+            Logger.error(this, "Unrecognized message: "+msg+" in phase 1");
+        }
+    }
+
+    public void setStatus(int newStatus) {
+        status = newStatus;
+        handler.statusChange(this, status);
+    }
+    
+}

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to