Author: toad
Date: 2005-10-13 16:20:26 +0000 (Thu, 13 Oct 2005)
New Revision: 7430
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
Log:
Mostly done the substance of SubscribeSender.
Now need to re-examine the ties to SubscribeHandler.
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-13 14:27:51 UTC (rev 7429)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-13 16:20:26 UTC (rev 7430)
@@ -25,17 +25,17 @@
private PeerNode next;
private int counter;
- // Still running
+ /** Still running */
static final int SEARCHING = -1;
- // Found a closer node than the closest-so-far
+ /** Found a closer node than the closest-so-far */
static final int SUCCESS = 0;
- // RNF => a) we are the closest node, we become root, or b) we fail and
return RNF
+ /** RNF => a) we are the closest node, we become root, or b) we fail and
return RNF. */
static final int ROUTE_NOT_FOUND = 1;
- // A node was overloaded
+ /** A node was overloaded */
static final int REJECTED_OVERLOAD = 2;
- // An internal error occurred
+ /** An internal error occurred */
static final int INTERNAL_ERROR = 3;
- // A node is restarting
+ /** A node further up the tree is restarting its subscription */
static final int RESTARTING = 4;
static final int INITIAL_TIMEOUT = 5000;
@@ -65,7 +65,7 @@
this.nearestSoFar = nearestSoFar;
target = handler.targetLocation();
node = handler.node;
- status = -1;
+ status = SEARCHING;
nodesRoutedTo = new HashSet();
}
@@ -87,7 +87,7 @@
// Phase 1 (find the node)
if(status == SEARCHING)
runPhase1();
-
+
// Phase 2 - wait for change
if(status == SUCCESS)
runPhase2Succeeded();
@@ -132,14 +132,31 @@
}
if(msg.getSpec() == DMT.FNPRouteNotFound) {
-
+ // It didn't find anywhere closer than us or someone behind us
:(
+ Logger.minor(this, msg.toString()+" in runPhase2Restarting");
+ setStatus(SEARCHING);
+ return;
}
- // TODO Auto-generated method stub
+ if(msg.getSpec() == DMT.FNPSubscribeSucceeded || msg.getSpec() ==
DMT.FNPSubscribeSucceededNewRoot) {
+ // Success!
+ double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
+ if(rootLoc < 0.0 || rootLoc > 1.0) {
+ Logger.error(this, "Invalid root loc: "+rootLoc+" from
"+next);
+ msg = DMT.createFNPUnsubscribe(id, counter);
+ next.sendAsync(msg, null);
+ continue;
+ }
+ // Validated success
+ handler.subscribeSucceeded(next, rootLoc);
+ setStatus(SUCCESS);
+ break;
+ }
+
} catch (DisconnectedException e) {
Logger.minor(this, "Lost connection in restarting");
// Lost him
- handler.lostParentConnectionAlreadyRestarting();
+ handler.lostParentConnectionWasRestarting();
setStatus(SEARCHING);
return;
}
@@ -156,18 +173,31 @@
* We can get an FNPSubscribeRestarted, or
* we can lose the connection.
*/
+ // REDFLAG: Do we want a way for a node to drop a subscription itself,
apart from
+ // restarting then failing?
MessageFilter mf =
MessageFilter.create().setType(DMT.FNPSubscribeRestarted).setField(DMT.UID,id).setSource(next).setTimeout(120*1000);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setTimeout(120*1000).setField(DMT.UID,
id).setType(DMT.FNPRejectedOverload);
+
while(true) {
try {
Message msg = node.usm.waitFor(mf);
- if(msg != null) {
+ if(msg == null) {
+ // Keep on waiting
+ continue;
+ } else if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
+ Logger.debug(this, "Restarting");
setStatus(RESTARTING);
return;
- } // else continue
+ } else if(msg.getSpec() == DMT.FNPRejectedOverload) {
+ Logger.debug(this, "Rejected: overload while in
succeeded on "+this);
+ setStatus(SEARCHING);
+ return;
+ }
+ Logger.error(this, "WTF?: "+msg);
} catch (DisconnectedException e) {
// Lost him
Logger.minor(this, "Lost connection in succeeded");
- handler.lostParentConnectionAlreadyRestarting();
+ handler.lostParentConnectionWasSuccessful();
setStatus(SEARCHING);
return;
}
@@ -216,19 +246,22 @@
/* What can happen?
* RejectedLoop can happen (ID collision)
* RejectedOverload can happen, maybe
- * RouteNotFound ???? probably not
+ * RouteNotFound - yes, if we are closer than any downstream
* SubscribeSucceeded can definitely happen (~= accepted)
* SubscribeRestarted can definitely happen (~= accepted)
* SubscribeSucceededNewRoot ??? probably not
*
* Anything else?
*/
+
+ // Terminal - don't need a counter (no further contact with the
node)
+ MessageFilter mfRejectedLoop =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT);
+ MessageFilter mfRejectedOverload =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT);
+ MessageFilter mfRouteNotFound =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRouteNotFound).setTimeout(INITIAL_TIMEOUT);
- MessageFilter mfRejectedLoop =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
- MessageFilter mfRejectedOverload =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
- // These two don't need a counter as they terminate our contact
with the node
- MessageFilter mfSubscribeSucceeded =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT);
- MessageFilter mfSubscribeRestarted =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT);
+ // Non-terminal - DO need a counter
+ MessageFilter mfSubscribeSucceeded =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
+ MessageFilter mfSubscribeRestarted =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
counter++;
MessageFilter mf =
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted)));
@@ -276,11 +309,24 @@
break;
}
+ if(msg.getSpec() == DMT.FNPRouteNotFound) {
+ // We are closer than they are
+ // RNF does not carry a location because of false minima
+ // It does however carry an HTL
+ // But we ignore HTL > what we had before
+ short newHTL = msg.getShort(DMT.HTL);
+ Logger.debug(this, "RNF: "+msg+" my htl currently "+htl);
+ if(htl > newHTL) htl = newHTL;
+ // Try next node, if have any HTL left
+ continue;
+ }
+
Logger.error(this, "Unrecognized message: "+msg+" in phase 1");
}
}
public void setStatus(int newStatus) {
+ Logger.debug(this, "Setting status to "+newStatus+" on "+this);
status = newStatus;
handler.statusChange(this, status);
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs