Author: toad
Date: 2005-10-13 14:27:51 +0000 (Thu, 13 Oct 2005)
New Revision: 7429
Added:
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
Log:
Missed files.
Added:
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
2005-10-12 21:55:08 UTC (rev 7428)
+++
branches/publish-subscribe/freenet/src/freenet/node/DroppingSubscriptionHandlerException.java
2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,10 @@
+package freenet.node;
+
+/**
+ * Thrown when we try to do something to a SubscriptionHandler
+ * while it is being removed from the SubscriptionManager. This
+ * is to avoid messy, potentially deadlocking nested locks.
+ */
+public class DroppingSubscriptionHandlerException extends Exception {
+
+}
Added:
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
2005-10-12 21:55:08 UTC (rev 7428)
+++
branches/publish-subscribe/freenet/src/freenet/node/QueuedSubscriptionRequest.java
2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,8 @@
+package freenet.node;
+
+/**
+ * A queued subscription request.
+ */
+public class QueuedSubscriptionRequest {
+
+}
Added: branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
2005-10-12 21:55:08 UTC (rev 7428)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,118 @@
+package freenet.node;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
+import freenet.node.SubscriptionHandler.PacketItem;
+import freenet.support.Logger;
+import freenet.support.NumberedItem;
+
+/**
+ * Handler for a node subscribed through us.
+ * Handles the maintenance of the connection, *not* the relaying of data
+ * chunks.
+ * On creation, should tell the downstream node the current status ASAP.
+ * After that, should keep it up to date.
+ *
+ * Also carries enough data to forward the request, if the request is
+ * queued due to a previous one being served at the time of receipt. If
+ * the previous one succeeds, we send success. If the previous one fails,
+ * we start.
+ */
+public class SubscribeHandler {
+
+ /** Our node */
+ final Node node;
+ /** Our original ID */
+ final long origID;
+ /** Our original HTL */
+ final short origHTL;
+ /** Our stream */
+ final SubscriptionHandler sub;
+ /** Our node */
+ final PeerNode subscriber;
+ /** Nearest location so far on incoming request */
+ final double origNearestLoc;
+ /** Updated nearest location so far */
+ private double nearestLoc;
+ /** Our next message number counter.
+ * Subscription status messages have to be ordered because of various
+ * race conditions.
+ */
+ private int counter;
+
+ /**
+ * Should be called with parent lock held.
+ */
+ SubscribeHandler(Node node, long id, short htl, PeerNode source,
+ int orderedMessagesSeq, double origNearestLoc, double
newNearestLoc, SubscriptionHandler handler, NumberedItem[] items) {
+ this.node = node;
+ counter = orderedMessagesSeq;
+ subscriber = source;
+ origID = id;
+ origHTL = htl;
+ this.sub = handler;
+ this.origNearestLoc = origNearestLoc;
+ this.nearestLoc = nearestLoc;
+ boolean rejectRNF = false;
+ double rootLoc = sub.rootLoc();
+ double target = sub.targetLocation();
+ boolean subscribed = sub.isSubscribed();
+ if(subscribed) {
+ if(Math.abs(target-rootLoc) <= Math.abs(target-origNearestLoc)) {
+ // Root is closer to target than request was
+ // Just accept it
+ } else {
+ // Root is further away from target than request was!
+ rejectRNF = true;
+ }
+ } else {
+ // We are restarting, or we are about to restart
+ // Just accept it
+ }
+ if(rejectRNF) {
+ // FIXME: lock safety? Should do this off-thread?
+ Message msg = DMT.createFNPRouteNotFound(origID,
node.decrementHTL(source, origHTL));
+ try {
+ source.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Could not send RNF to "+source);
+ }
+ } else {
+ // Add self to parent
+ sub.addSubscriberHandler(this);
+ // FIXME: lock safety? Should do this off-thread?
+ try {
+ if(sendStatusMessage(subscribed, rootLoc)) {
+ for(int i=0;i<items.length;i++) {
+ PacketItem item = (PacketItem) items[i];
+ item.forwardTo(source);
+ }
+ }
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected from "+source);
+ sub.removeSubscriberHandler(this);
+ }
+ }
+ }
+
+ /**
+ * Send the current status to the source node.
+ * @return True unless we disconnected.
+ */
+ private boolean sendStatusMessage(boolean subscribed, double rootLoc) {
+ Message msg;
+ if(subscribed)
+ msg = DMT.createFNPSubscribeSucceeded(origID, counter++, rootLoc);
+ else
+ msg = DMT.createFNPSubscribeRestarted(origID, sub.getRestartUID(),
counter++);
+ try {
+ subscriber.sendAsync(msg, null);
+ return true;
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected sending status message to
"+subscriber);
+ sub.removeSubscriberHandler(this);
+ return false;
+ }
+ }
+}
Added: branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-12 21:55:08 UTC (rev 7428)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-13 14:27:51 UTC (rev 7429)
@@ -0,0 +1,288 @@
+package freenet.node;
+
+import java.util.HashSet;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.support.Logger;
+
+/**
+ * SubscriptionRequest sender. Analogous to RequestSender.
+ */
+public class SubscribeSender implements Runnable {
+
+ final Node node;
+ final long id;
+ final double target;
+ private short htl;
+ final PeerNode source;
+ final SubscriptionHandler handler;
+ final double nearestSoFar;
+ private final HashSet nodesRoutedTo;
+ private PeerNode next;
+ private int counter;
+
+ // Still running
+ static final int SEARCHING = -1;
+ // Found a closer node than the closest-so-far
+ static final int SUCCESS = 0;
+ // RNF => a) we are the closest node, we become root, or b) we fail and
return RNF
+ static final int ROUTE_NOT_FOUND = 1;
+ // A node was overloaded
+ static final int REJECTED_OVERLOAD = 2;
+ // An internal error occurred
+ static final int INTERNAL_ERROR = 3;
+ // A node is restarting
+ static final int RESTARTING = 4;
+
+ static final int INITIAL_TIMEOUT = 5000;
+ static final int RESTART_TIMEOUT = 120*1000;
+
+ private int status;
+
+ int getStatus() {
+ return status;
+ }
+
+ /**
+ * Constructor.
+ * Note that we do not start() in the constructor for locking reasons.
+ * @param id The message ID of the SubscribeRequest chain.
+ * @param htl The HTL currently remaining on the SubscribeRequest chain.
Passed in
+ * "raw"; we get to reset it.
+ * @param source The source node. Null if this is locally originated.
+ * @param handler The SubscriptionHandler which created us, and who want
to know
+ * when our status changes.
+ */
+ public SubscribeSender(long id, short htl, PeerNode source, double
nearestSoFar, SubscriptionHandler handler) {
+ this.id = id;
+ this.htl = htl;
+ this.source = source;
+ this.handler = handler;
+ this.nearestSoFar = nearestSoFar;
+ target = handler.targetLocation();
+ node = handler.node;
+ status = -1;
+ nodesRoutedTo = new HashSet();
+ }
+
+ public void run() {
+ try {
+ realRun();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" in SubscribeSender for "+id, t);
+ setStatus(INTERNAL_ERROR);
+ }
+ }
+
+ /**
+ * The real run() method - we keep the try/catch in a separate
+ * method so we don't forget to handle errors.
+ */
+ private void realRun() {
+ while(true) {
+ // Phase 1 (find the node)
+ if(status == SEARCHING)
+ runPhase1();
+
+ // Phase 2 - wait for change
+ if(status == SUCCESS)
+ runPhase2Succeeded();
+ else if(status == RESTARTING)
+ runPhase2Restarting();
+ else return;
+ }
+ }
+
+ /**
+ * A node sent us an FNPSubscribeRestarting.
+ * Wait for it to succeed, fail, or timeout.
+ */
+ private void runPhase2Restarting() {
+ /* What happens now??
+ * One of three things:
+ * - We get an error
+ * - We get a timeout
+ * - We get success
+ *
+ * Success: FNPSubscribeSucceeded, FNPSubscribeSucceededNewRoot
+ *
+ * Error: RejectedOverload, RouteNotFound
+ */
+ MessageFilter mfSubscribeSucceeded =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setField(DMT.ORDERED_MESSAGE_NUM,
counter).setType(DMT.FNPSubscribeSucceeded);
+ MessageFilter mfSubscribeSucceededNewRoot =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setField(DMT.ORDERED_MESSAGE_NUM,
counter).setType(DMT.FNPSubscribeSucceededNewRoot);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setType(DMT.FNPRejectedOverload);
+ MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setType(DMT.FNPRouteNotFound);
+
+ MessageFilter mf =
mfSubscribeSucceeded.or(mfSubscribeSucceededNewRoot.or(mfRejectedOverload.or(mfRouteNotFound)));
+ counter++;
+
+ try {
+ Message msg = node.usm.waitFor(mf);
+
+ if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
+ // :(
+ // Timeout or rejected:overload
+ Logger.error(this, "Timeout or rejected:overload in
restarting: "+msg+" for "+id+" to "+next);
+ setStatus(REJECTED_OVERLOAD);
+ return;
+ }
+
+ if(msg.getSpec() == DMT.FNPRouteNotFound) {
+
+ }
+
+ // TODO Auto-generated method stub
+ } catch (DisconnectedException e) {
+ Logger.minor(this, "Lost connection in restarting");
+ // Lost him
+ handler.lostParentConnectionAlreadyRestarting();
+ setStatus(SEARCHING);
+ return;
+ }
+
+ }
+
+ /**
+ * A node sent us an FNPSubscribeSucceeded.
+ * Wait for anything to change - supervises the subscription,
+ * essentially.
+ */
+ private void runPhase2Succeeded() {
+ /* What can change?
+ * We can get an FNPSubscribeRestarted, or
+ * we can lose the connection.
+ */
+ MessageFilter mf =
MessageFilter.create().setType(DMT.FNPSubscribeRestarted).setField(DMT.UID,id).setSource(next).setTimeout(120*1000);
+ while(true) {
+ try {
+ Message msg = node.usm.waitFor(mf);
+ if(msg != null) {
+ setStatus(RESTARTING);
+ return;
+ } // else continue
+ } catch (DisconnectedException e) {
+ // Lost him
+ Logger.minor(this, "Lost connection in succeeded");
+ handler.lostParentConnectionAlreadyRestarting();
+ setStatus(SEARCHING);
+ return;
+ }
+ }
+ }
+
+ /**
+ * Find a node to subscribe to.
+ */
+ private void runPhase1() {
+ while(true) {
+ // Check HTL > 0
+ if(htl <= 0) {
+ setStatus(ROUTE_NOT_FOUND);
+ return;
+ }
+
+ // Route
+
+ synchronized(node.peers) {
+ next = node.peers.closerPeer(source, nodesRoutedTo, target,
true);
+ }
+
+ if(next == null) {
+ // Backtrack
+ setStatus(ROUTE_NOT_FOUND);
+ return;
+ }
+ Logger.minor(this, "Routing subscribe to "+next);
+ nodesRoutedTo.add(next);
+
+ // Create message, including new counter
+ counter = node.random.nextInt();
+ Message msg = DMT.createFNPSubscribeRequest(id, htl, handler.key,
handler.getLastSeqNum(), nearestSoFar, counter);
+
+ try {
+ // Send it
+ next.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected from "+next);
+ continue;
+ }
+
+ // Wait for success/failure/etc
+
+ /* What can happen?
+ * RejectedLoop can happen (ID collision)
+ * RejectedOverload can happen, maybe
+ * RouteNotFound ???? probably not
+ * SubscribeSucceeded can definitely happen (~= accepted)
+ * SubscribeRestarted can definitely happen (~= accepted)
+ * SubscribeSucceededNewRoot ??? probably not
+ *
+ * Anything else?
+ */
+
+ MessageFilter mfRejectedLoop =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
+ // These two don't need a counter as they terminate our contact
with the node
+ MessageFilter mfSubscribeSucceeded =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT);
+ MessageFilter mfSubscribeRestarted =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT);
+ counter++;
+
+ MessageFilter mf =
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted)));
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e1) {
+ Logger.normal(this, "Disconnected: "+next+" while waiting");
+ htl = node.decrementHTL(source, htl);
+ continue;
+ }
+
+ if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
+ // Timeout or rejected:overload
+ Logger.error(this, "Timeout or rejected:overload: "+msg+" for
"+id+" to "+next);
+ setStatus(REJECTED_OVERLOAD);
+ return;
+ }
+
+ if(msg.getSpec() == DMT.FNPRejectedLoop) {
+ Logger.minor(this, "Loop; decrementing htl");
+ htl = node.decrementHTL(source, htl);
+ continue;
+ }
+
+ if(msg.getSpec() == DMT.FNPSubscribeSucceeded) {
+ // Success!
+ double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
+ if(rootLoc < 0.0 || rootLoc > 1.0) {
+ Logger.error(this, "Invalid root loc: "+rootLoc+" from
"+next);
+ msg = DMT.createFNPUnsubscribe(id, counter);
+ next.sendAsync(msg, null);
+ continue;
+ }
+ // Validated success
+ handler.subscribeSucceeded(next, rootLoc);
+ setStatus(SUCCESS);
+ break;
+ }
+
+ if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
+ long restartID = msg.getLong(DMT.RESTART_UID);
+ handler.subscribeRestarted(next, restartID);
+ setStatus(RESTARTING);
+ break;
+ }
+
+ Logger.error(this, "Unrecognized message: "+msg+" in phase 1");
+ }
+ }
+
+ public void setStatus(int newStatus) {
+ status = newStatus;
+ handler.statusChange(this, status);
+ }
+
+}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs