Author: toad
Date: 2005-10-21 13:19:36 +0000 (Fri, 21 Oct 2005)
New Revision: 7442
Added:
branches/publish-subscribe/freenet/pubsub_notes/
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
Modified:
branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch
branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
branches/publish-subscribe/freenet/src/freenet/node/Node.java
branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
Log:
Last revision of pub/sub for a while, IMHO this is a blind alley, but it has
been
useful; will feed into the eventual Working Pub/Sub Architecture, which will
probably have a lot to do with passive requests and possibly something to do
with
TUKs.
Modified:
branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch
===================================================================
--- branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/.externalToolBuilders/New_Builder.launch
2005-10-21 13:19:36 UTC (rev 7442)
@@ -5,7 +5,7 @@
<booleanAttribute key="org.eclipse.jdt.launching.DEFAULT_CLASSPATH"
value="true"/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_RUN_BUILD_KINDS"
value="full,incremental,"/>
<booleanAttribute key="org.eclipse.ant.ui.ATTR_TARGETS_UPDATED" value="true"/>
-<stringAttribute key="org.eclipse.ui.externaltools.ATTR_LOCATION"
value="${workspace_loc:/Freenet 0.7/build.xml}"/>
+<stringAttribute key="org.eclipse.ui.externaltools.ATTR_LOCATION"
value="${workspace_loc:/Freenet 0.7 publish subscribe branch/build.xml}"/>
<booleanAttribute key="org.eclipse.ui.externaltools.ATTR_TRIGGERS_CONFIGURED"
value="true"/>
<booleanAttribute key="org.eclipse.debug.core.appendEnvironmentVariables"
value="true"/>
<stringAttribute key="org.eclipse.jdt.launching.CLASSPATH_PROVIDER"
value="org.eclipse.ant.ui.AntClasspathProvider"/>
Modified: branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -85,6 +85,7 @@
public static final String MUST_BEAT_LOCATION = "mustBeatLocation";
public static final String ROOT_LOCATION = "rootLocation";
public static final String ORDERED_MESSAGE_NUM = "orderedMessageNum";
+ public static final String NEW_UID = "newUID";
//Diagnostic
public static final MessageType ping = new MessageType("ping") {{
@@ -781,6 +782,11 @@
msg.set(HTL, htl);
return msg;
}
+
+ public static final MessageType FNPCoalesceNotify = new
MessageType("FNPCoalesceNotify") {{
+ addField(UID, Long.class);
+ addField(NEW_UID, Long.class);
+ }};
public static final MessageType FNPPublishData = new
MessageType("FNPPublishData") {{
addField(UID, Long.class);
@@ -872,26 +878,6 @@
return msg;
}
- public static final MessageType FNPResubscribeRequest = new
MessageType("FNPResubscribeRequest") {{
- addField(RESTART_UID, Long.class);
- addField(HTL, Short.class);
- addField(KEY, PublishStreamKey.class);
- addField(LAST_SEQNO, Long.class);
- addField(NEAREST_LOCATION, Double.class);
- addField(MUST_BEAT_LOCATION, Double.class);
- }};
-
- public static final Message createFNPResubscribeRequest(long restartUID,
short htl, PublishStreamKey key, long lastSeqNum, double nearestSoFar, double
mustBeatLocation) {
- Message msg = new Message(FNPResubscribeRequest);
- msg.set(RESTART_UID, restartUID);
- msg.set(HTL, htl);
- msg.set(KEY, key);
- msg.set(LAST_SEQNO, lastSeqNum);
- msg.set(NEAREST_LOCATION, nearestSoFar);
- msg.set(MUST_BEAT_LOCATION, mustBeatLocation);
- return msg;
- }
-
public static final MessageType FNPSubscribeRestarted = new
MessageType("FNPSubscribeRestarted") {{
addField(UID, Long.class); // if it is a reply to a SubscribeRequest,
we need to be able to identify it,
// if only because of possible race conditions.
@@ -904,7 +890,6 @@
Message msg = new Message(FNPSubscribeRestarted);
msg.set(UID, uid);
msg.set(RESTART_UID, restartUID);
- msg.set(MUST_BEAT_LOCATION, orderedMessageNum);
return msg;
}
@@ -922,18 +907,6 @@
return msg;
}
- public static final MessageType FNPSubscribeSucceededNewRoot = new
MessageType("FNPSubscribeSucceededNewRoot") {{
- addField(ORDERED_MESSAGE_NUM, Integer.class);
- addField(UID, Long.class);
- }};
-
- public static final Message createFNPSubscribeSucceededNewRoot(long uid,
PublishStreamKey key, int orderedMessageNum) {
- Message msg = new Message(FNPSubscribeSucceeded);
- msg.set(UID, uid);
- msg.set(ORDERED_MESSAGE_NUM, orderedMessageNum);
- return msg;
- }
-
public static final MessageType FNPUnsubscribe = new
MessageType("FNPUnsubscribe") {{
addField(UID, Long.class);
}};
Modified: branches/publish-subscribe/freenet/src/freenet/node/Node.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/Node.java
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/Node.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -761,4 +761,9 @@
public ClientSubscription subscribe(ClientPublishStreamKey key,
SubscriptionCallback cb) {
return subscriptions.localSubscribe(key, cb);
}
+
+ /** Reject any requests made with the given ID. */
+ public synchronized void blockID(long newID) {
+ recentlyCompletedIDs.push(new Long(newID));
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -76,8 +76,6 @@
return handleInsertRequest(m);
} else if(spec == DMT.FNPSubscribeRequest) {
return node.subscriptions.handleSubscribeRequest(m);
- } else if(spec == DMT.FNPResubscribeRequest) {
- return node.subscriptions.handleResubscribeRequest(m);
} else if(spec == DMT.FNPPublishData) {
return node.subscriptions.handlePublishData(m);
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -79,6 +79,14 @@
Logger.error(this, "Could not send RNF to "+source);
}
} else {
+ // Accept
+ Message accept = DMT.createFNPAccepted(id);
+ try {
+ subscriber.sendAsync(accept, null);
+ } catch (NotConnectedException e1) {
+ Logger.minor(this, "Not connected while sending
Accepted");
+ return;
+ }
// Add self to parent
sub.addSubscriberHandler(this);
// FIXME: lock safety? Should do this off-thread?
@@ -105,7 +113,7 @@
if(subscribed)
msg = DMT.createFNPSubscribeSucceeded(origID, counter++, rootLoc);
else
- msg = DMT.createFNPSubscribeRestarted(origID, sub.getRestartUID(),
counter++);
+ msg = DMT.createFNPSubscribeRestarted(origID,
sub.getSubscribingUID(), counter++);
try {
subscriber.sendAsync(msg, null);
return true;
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-21 13:17:13 UTC (rev 7441)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -10,7 +10,7 @@
import freenet.support.Logger;
/**
- * SubscriptionRequest sender. Analogous to RequestSender.
+ * SubscribeRequest sender. Analogous to RequestSender.
*/
public class SubscribeSender implements Runnable {
@@ -19,7 +19,7 @@
final double target;
private short htl;
final PeerNode source;
- final SubscriptionHandler handler;
+ final SubscribeSenderCallback handler;
final SubscribeHandler origSubscriber;
final double nearestSoFar;
private final HashSet nodesRoutedTo;
@@ -36,10 +36,11 @@
static final int REJECTED_OVERLOAD = 2;
/** An internal error occurred */
static final int INTERNAL_ERROR = 3;
- /** A node further up the tree is restarting its subscription */
+ /** A node further up the tree is restarting its subscription. We may be
called upon to be part of its search. */
static final int RESTARTING = 4;
static final int INITIAL_TIMEOUT = 5000;
+ static final int SEARCH_TIMEOUT = 120*1000;
static final int RESTART_TIMEOUT = 120*1000;
private int status;
@@ -60,14 +61,15 @@
* @param sub The SubscribeHandler which originated this SubscribeSender,
if any.
* Null if this is a locally originated subscription.
*/
- public SubscribeSender(long id, short htl, PeerNode source, double
nearestSoFar, SubscriptionHandler handler, SubscribeHandler sub) {
+ public SubscribeSender(long id, short htl, PeerNode source, double
nearestSoFar,
+ SubscribeSenderCallback handler, SubscribeHandler sub) {
this.id = id;
this.htl = htl;
this.source = source;
this.handler = handler;
this.nearestSoFar = nearestSoFar;
target = handler.targetLocation();
- node = handler.node;
+ node = handler.getNode();
status = SEARCHING;
nodesRoutedTo = new HashSet();
origSubscriber = sub;
@@ -117,16 +119,14 @@
* Error: RejectedOverload, RouteNotFound
*/
MessageFilter mfSubscribeSucceeded =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setField(DMT.ORDERED_MESSAGE_NUM,
counter).setType(DMT.FNPSubscribeSucceeded);
- MessageFilter mfSubscribeSucceededNewRoot =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setField(DMT.ORDERED_MESSAGE_NUM,
counter).setType(DMT.FNPSubscribeSucceededNewRoot);
MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setType(DMT.FNPRejectedOverload);
MessageFilter mfRouteNotFound =
MessageFilter.create().setSource(next).setTimeout(RESTART_TIMEOUT).setField(DMT.UID,
id).setType(DMT.FNPRouteNotFound);
- MessageFilter mf =
mfSubscribeSucceeded.or(mfSubscribeSucceededNewRoot.or(mfRejectedOverload.or(mfRouteNotFound)));
- counter++;
+ MessageFilter mf =
mfSubscribeSucceeded.or(mfRejectedOverload.or(mfRouteNotFound));
try {
Message msg = node.usm.waitFor(mf);
-
+
if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
// :(
// Timeout or rejected:overload
@@ -135,7 +135,7 @@
setStatus(REJECTED_OVERLOAD);
return;
}
-
+
if(msg.getSpec() == DMT.FNPRouteNotFound) {
// It didn't find anywhere closer than us or someone behind us
:(
Logger.minor(this, msg.toString()+" in runPhase2Restarting");
@@ -144,7 +144,8 @@
return;
}
- if(msg.getSpec() == DMT.FNPSubscribeSucceeded || msg.getSpec() ==
DMT.FNPSubscribeSucceededNewRoot) {
+ if(msg.getSpec() == DMT.FNPSubscribeSucceeded) {
+ counter++;
// Success!
double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
if(rootLoc < 0.0 || rootLoc > 1.0) {
@@ -163,7 +164,7 @@
setStatus(SUCCESS);
return;
}
-
+
} catch (DisconnectedException e) {
Logger.minor(this, "Lost connection in restarting");
// Lost him
@@ -188,6 +189,7 @@
// restarting then failing?
MessageFilter mf =
MessageFilter.create().setType(DMT.FNPSubscribeRestarted).setField(DMT.UID,id).setSource(next).setTimeout(120*1000);
MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setTimeout(120*1000).setField(DMT.UID,
id).setType(DMT.FNPRejectedOverload);
+ mf = mf.or(mfRejectedOverload);
while(true) {
try {
@@ -219,7 +221,8 @@
}
/**
- * Find a node to subscribe to.
+ * Find a node to subscribe to. Wait until we are failed, RESTARTING or
+ * SUCCEEDED.
*/
private void runPhase1() {
while(true) {
@@ -247,8 +250,8 @@
nodesRoutedTo.add(next);
// Create message, including new counter
- counter = node.random.nextInt();
- Message msg = DMT.createFNPSubscribeRequest(id, htl, handler.key,
handler.getLastSeqNum(), nearestSoFar, counter);
+ counter = node.random.nextInt(); // new counter for this node
+ Message msg = DMT.createFNPSubscribeRequest(id, htl,
handler.getKey(), handler.getLastSeqNum(), nearestSoFar, counter);
try {
// Send it
@@ -257,42 +260,26 @@
Logger.normal(this, "Disconnected from "+next);
continue;
}
+
+ // Wait for Accepted
- // Wait for success/failure/etc
-
- /* What can happen?
- * RejectedLoop can happen (ID collision)
- * RejectedOverload can happen, maybe
- * RouteNotFound - yes, if we are closer than any downstream
- * SubscribeSucceeded can definitely happen (~= accepted)
- * SubscribeRestarted can definitely happen (~= accepted)
- * SubscribeSucceededNewRoot ??? probably not
- *
- * Anything else?
- */
-
- // Terminal - don't need a counter (no further contact with the
node)
+ MessageFilter mfAccepted =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPAccepted).setTimeout(INITIAL_TIMEOUT);
MessageFilter mfRejectedLoop =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedLoop).setTimeout(INITIAL_TIMEOUT);
MessageFilter mfRejectedOverload =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRejectedOverload).setTimeout(INITIAL_TIMEOUT);
- MessageFilter mfRouteNotFound =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPRouteNotFound).setTimeout(INITIAL_TIMEOUT);
+
+ MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload));
- // Non-terminal - DO need a counter
- MessageFilter mfSubscribeSucceeded =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeSucceeded).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
- MessageFilter mfSubscribeRestarted =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
- counter++;
-
- MessageFilter mf =
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted.or(mfRouteNotFound))));
-
+ msg = null;
try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e1) {
- Logger.normal(this, "Disconnected: "+next+" while waiting");
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e2) {
+ Logger.normal(this, "Disconnected: "+next+" while waiting for
accepted in "+this);
htl = node.decrementHTL(source, htl);
continue;
- }
+ }
if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
- // Timeout or rejected:overload
+ // Timeout or rejected:overload; fatal error
Logger.error(this, "Timeout or rejected:overload: "+msg+" for
"+id+" to "+next);
handler.senderRejectedOverload();
setStatus(REJECTED_OVERLOAD);
@@ -304,46 +291,145 @@
htl = node.decrementHTL(source, htl);
continue;
}
-
- if(msg.getSpec() == DMT.FNPSubscribeSucceeded) {
- // Success!
- double rootLoc = msg.getDouble(DMT.ROOT_LOCATION);
- if(rootLoc < 0.0 || rootLoc > 1.0) {
- Logger.error(this, "Invalid root loc: "+rootLoc+" from
"+next);
- msg = DMT.createFNPUnsubscribe(id, counter);
- try {
- next.sendAsync(msg, null);
- } catch (NotConnectedException e) {
- Logger.error(this,
"NotConnectedException sending "+msg+" to "+next);
- }
- continue;
- }
- // Validated success
- handler.subscribeSucceeded(next, rootLoc,
(short)(origSubscriber.origHTL - htl));
- setStatus(SUCCESS);
- break;
+
+ if(msg.getSpec() != DMT.FNPAccepted) {
+ Logger.error(this, "Unexpected message waiting for Accepted in
"+this+": "+msg);
+ continue;
}
- if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
- long restartID = msg.getLong(DMT.RESTART_UID);
- handler.subscribeRestarted(next, restartID);
- setStatus(RESTARTING);
- break;
- }
+ // Accepted
- if(msg.getSpec() == DMT.FNPRouteNotFound) {
- // We are closer than they are
- // RNF does not carry a location because of false minima
- // It does however carry an HTL
- // But we ignore HTL > what we had before
- short newHTL = msg.getShort(DMT.HTL);
- Logger.debug(this, "RNF: "+msg+" my htl currently "+htl);
- if(htl > newHTL) htl = newHTL;
- // Try next node, if have any HTL left
- continue;
- }
-
- Logger.error(this, "Unrecognized message: "+msg+" in phase 1");
+ while (true) {
+
+ // Wait for success, restart, failure
+
+ /* 120 second timeout.
+ * What can happen?
+ *
+ * RejectedOverload (or timeout) - overload
(fatal)
+ * RouteNotFound - can't find it, and can't
become root
+ * SubscribeSucceeded - go to SUCCEEDED
+ * SubscribeRestarted - go to RESTARTING
+ * CoalesceNotify - make sure requests with
given ID are rejected
+ */
+
+ // Terminal - does not need a counter (no
further contact with the node)
+ MessageFilter mfRouteNotFound =
MessageFilter.create()
+ .setField(DMT.UID,
id).setSource(next).setType(
+
DMT.FNPRouteNotFound)
+ .setTimeout(SEARCH_TIMEOUT);
+ mfRejectedOverload.setTimeout(SEARCH_TIMEOUT);
+
+ // Non-terminal - DO need a counter
+ MessageFilter mfSubscribeSucceeded =
MessageFilter.create()
+ .setField(DMT.UID,
id).setSource(next).setType(
+
DMT.FNPSubscribeSucceeded).setTimeout(
+
SEARCH_TIMEOUT).setField(
+
DMT.ORDERED_MESSAGE_NUM, counter);
+ MessageFilter mfSubscribeRestarted =
MessageFilter.create()
+ .setField(DMT.UID,
id).setSource(next).setType(
+
DMT.FNPSubscribeRestarted).setTimeout(
+
SEARCH_TIMEOUT).setField(
+
DMT.ORDERED_MESSAGE_NUM, counter);
+
+ // Misc
+ MessageFilter mfCoalesceNotify =
MessageFilter.create()
+ .setField(DMT.UID,
id).setSource(next).setType(
+
DMT.FNPCoalesceNotify).setTimeout(
+ SEARCH_TIMEOUT);
+
+ mf = mfRejectedOverload.or(mfSubscribeSucceeded
+
.or(mfSubscribeRestarted.or(mfRouteNotFound
+
.or(mfCoalesceNotify))));
+
+ try {
+ msg = node.usm.waitFor(mf);
+ } catch (DisconnectedException e1) {
+ Logger.normal(this, "Disconnected: " +
next
+ + " while waiting in "
+ this);
+ htl = node.decrementHTL(source, htl);
+ break; // try next node
+ }
+
+ if (msg == null || msg.getSpec() ==
DMT.FNPRejectedOverload) {
+ // Timeout or rejected:overload
+ Logger.error(this, "Timeout or
rejected:overload: " + msg
+ + " for " + id + " to "
+ next);
+ handler.senderRejectedOverload();
+ setStatus(REJECTED_OVERLOAD);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPSubscribeSucceeded)
{
+ counter++;
+ // Success! Go to SUCCEEDED
+ double rootLoc =
msg.getDouble(DMT.ROOT_LOCATION);
+ if (rootLoc < 0.0 || rootLoc > 1.0) {
+ Logger.error(this, "Invalid
root loc: " + rootLoc
+ + " from " +
next);
+ msg =
DMT.createFNPUnsubscribe(id, counter);
+ try {
+ next.sendAsync(msg,
null);
+ } catch (NotConnectedException
e) {
+ Logger.error(this,
"NotConnectedException sending "
+ + msg +
" to " + next);
+ }
+ break; // try next node
+ }
+ // Validated success
+ if (!handler.subscribeSucceeded(next,
rootLoc,
+ (short)
(origSubscriber.origHTL - htl))) {
+ Logger.error(this,
+ "Rejected root
in successful subscription: "
+
+ rootLoc + " from " + next);
+ Message m =
DMT.createFNPUnsubscribe(id, counter);
+ try {
+ next.sendAsync(m, null);
+ } catch (NotConnectedException
e) {
+ // Unusual, but not
particularly interesting
+ Logger.minor(this,
+ "Lost
connection rejecting successful subscription to "
+
+ next + " on " + this);
+ }
+ }
+ setStatus(SUCCESS);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPSubscribeRestarted)
{
+ counter++;
+ long restartID =
msg.getLong(DMT.RESTART_UID);
+ handler.subscribeRestarted(next,
restartID);
+ setStatus(RESTARTING);
+ return;
+ }
+
+ if (msg.getSpec() == DMT.FNPRouteNotFound) {
+ // We are closer than they are
+ // RNF does not carry a location
because of false minima
+ // It does however carry an HTL
+ // But we ignore HTL > what we had
before
+ short newHTL = msg.getShort(DMT.HTL);
+ Logger.debug(this, "RNF: " + msg + " my
htl currently "
+ + htl);
+ if (htl > newHTL)
+ htl = newHTL;
+ // Try next node, if have any HTL left
+ break;
+ }
+
+ if (msg.getSpec() == DMT.FNPCoalesceNotify) {
+ long newID = msg.getLong(DMT.NEW_UID);
+ node.blockID(newID);
+ // FIXME: what if it has already
arrived? it will time out... so what?
+ Logger.minor(this, "Received coalesce
in " + this + " : "
+ + id + " -> " + newID);
+ continue; // more messages to come
+ }
+
+ Logger.error(this, "Unrecognized message: " +
msg
+ + " in phase 1");
+ }
}
}
Added:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
2005-10-21 13:17:13 UTC (rev 7441)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSenderCallback.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -0,0 +1,51 @@
+package freenet.node;
+
+import freenet.keys.PublishStreamKey;
+
+/**
+ * Interface for SubscribeSender to call. Implemented by SubscriptionHandler,
+ * and others.
+ */
+interface SubscribeSenderCallback {
+
+ /** The key, as a double. */
+ double targetLocation();
+
+ /** The Node we are part of. */
+ Node getNode();
+
+ /** The restart UID on the SubscribeRestarted message recently received
*/
+ long getUpstreamRestartUID();
+
+ /** The node we routed to rejected the request due to overload. */
+ void senderRejectedOverload();
+
+ /** The node we routed to RNF'ed while it was RESTARTING. */
+ void senderRestartingSearchingRNF();
+
+ /** Got an invalid SubscribeSucceeded message while RESTARTING! */
+ void invalidSuccessInRestarting();
+
+ /** Success! @return True unless we reject the root location. */
+ boolean subscribeSucceeded(PeerNode next, double rootLoc, short
htlDiff);
+
+ /** Lost the connection to the parent node, while RESTARTING. */
+ void lostParentConnectionWasRestarting();
+
+ /** Got a SubscribeRestarted */
+ void subscribeRestarted(PeerNode next, long restartID);
+
+ /** Lost the connection to the parent node, while subscribed. */
+ void lostParentConnectionWasSuccessful();
+
+ /** Could not find the stream. */
+ void senderRNF();
+
+ /** Get the key being searched for */
+ PublishStreamKey getKey();
+
+ /** Get the seqno of the last packet we saw, or rather the last one that
+ * we don't want. */
+ long getLastSeqNum();
+
+}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-21 13:17:13 UTC (rev 7441)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -21,7 +21,7 @@
* May have a parent node, or may be the root.
* May have many child nodes.
*/
-public class SubscriptionHandler {
+public class SubscriptionHandler implements SubscribeSenderCallback {
static final int KEEP_PACKETS = 32;
@@ -563,7 +563,7 @@
Logger.minor(this, "Rejected because orig
closer than root, but conn closed on "+id+" for "+this);
}
// There *may* be a more optimal route somewhere...
- maybeRestart();
+ maybeResubscribe();
}
try {
@@ -601,6 +601,7 @@
synchronized(this) {
if(subscribed) return;
if(sender != null) return;
+ subscribingUID = id;
sender = new SubscribeSender(id, htl, source, nearest, this, sub);
}
Thread t = new Thread(sender);
@@ -711,7 +712,7 @@
subscribing = true;
}
}
- startSubscribe(random.nextLong(), Node.MAX_HTL,
node.lm.getLocation().getValue(), null, null);
+ startSubscribe(subscribingUID, Node.MAX_HTL,
node.lm.getLocation().getValue(), null, null);
} finally {
if(restartingNow && sender == null)
subscribing = false;
@@ -811,7 +812,7 @@
* attempt with the next SubscribeHandler.
* @throws DroppingSubscriptionHandlerException
*/
- void senderRNF() {
+ public void senderRNF() {
Logger.minor(this, "Sender could not find stream on "+this);
boolean becomeRoot = false;
short htlDiff = 0;
@@ -891,7 +892,7 @@
* Forward it to the original requestor, and run another one if
possible.
* If this was a local request, tell all clients they are disconnected.
*/
- void senderRejectedOverload() {
+ public void senderRejectedOverload() {
ClientSubscriptionHandler[] clients;
synchronized(this) {
SubscribeHandler origSub = sender.origSubscriber;
@@ -917,7 +918,7 @@
}
/** Upstream restarted */
- void subscribeRestarted(PeerNode next, long restartID) {
+ public void subscribeRestarted(PeerNode next, long restartID) {
SubscribeHandler[] subscribers;
synchronized(this) {
parent = next;
@@ -945,17 +946,17 @@
* Got an RNF while upstream was restarting.
* Just send our old restart ID out.
*/
- void senderRestartingSearchingRNF() {
+ public void senderRestartingSearchingRNF() {
upstreamRestartFailed();
}
/** Lost connection to node which was restarting. */
- void lostParentConnectionWasRestarting() {
+ public void lostParentConnectionWasRestarting() {
upstreamRestartFailed();
}
/** Restarting would-be parent sent an invalid success message */
- void invalidSuccessInRestarting() {
+ public void invalidSuccessInRestarting() {
upstreamRestartFailed();
}
@@ -980,4 +981,12 @@
public long getUpstreamRestartUID() {
return upstreamRestartUID;
}
+
+ public Node getNode() {
+ return node;
+ }
+
+ public PublishStreamKey getKey() {
+ return key;
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
2005-10-21 13:17:13 UTC (rev 7441)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
2005-10-21 13:19:36 UTC (rev 7442)
@@ -155,8 +155,7 @@
/**
* Handle a SubscribeRequest.
- * @return True unless we want the message to be put back
- * onto the queue.
+ * @return True unless we want the message to be put back onto the queue.
*/
public boolean handleSubscribeRequest(Message m) {
// Do we have the stream in question, firstly?
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs