Author: toad
Date: 2005-10-13 17:33:49 +0000 (Thu, 13 Oct 2005)
New Revision: 7431
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
Log:
Now only need to implement the remaining actual callbacks.
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
2005-10-13 16:20:26 UTC (rev 7430)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeHandler.java
2005-10-13 17:33:49 UTC (rev 7431)
@@ -100,7 +100,7 @@
* Send the current status to the source node.
* @return True unless we disconnected.
*/
- private boolean sendStatusMessage(boolean subscribed, double rootLoc) {
+ boolean sendStatusMessage(boolean subscribed, double rootLoc) {
Message msg;
if(subscribed)
msg = DMT.createFNPSubscribeSucceeded(origID, counter++, rootLoc);
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-13 16:20:26 UTC (rev 7430)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-13 17:33:49 UTC (rev 7431)
@@ -127,6 +127,7 @@
// :(
// Timeout or rejected:overload
Logger.error(this, "Timeout or rejected:overload in
restarting: "+msg+" for "+id+" to "+next);
+ handler.senderRejectedOverload();
setStatus(REJECTED_OVERLOAD);
return;
}
@@ -134,6 +135,7 @@
if(msg.getSpec() == DMT.FNPRouteNotFound) {
// It didn't find anywhere closer than us or someone behind us
:(
Logger.minor(this, msg.toString()+" in runPhase2Restarting");
+ handler.senderRestartingSearchingRNF();
setStatus(SEARCHING);
return;
}
@@ -186,11 +188,14 @@
continue;
} else if(msg.getSpec() == DMT.FNPSubscribeRestarted) {
Logger.debug(this, "Restarting");
+ long restartID = msg.getLong(DMT.RESTART_UID);
+ handler.subscribeRestarted(next, restartID);
setStatus(RESTARTING);
return;
} else if(msg.getSpec() == DMT.FNPRejectedOverload) {
Logger.debug(this, "Rejected: overload while in
succeeded on "+this);
- setStatus(SEARCHING);
+ handler.senderRejectedOverload();
+ setStatus(REJECTED_OVERLOAD);
return;
}
Logger.error(this, "WTF?: "+msg);
@@ -211,6 +216,8 @@
while(true) {
// Check HTL > 0
if(htl <= 0) {
+ // REDFLAG: We use RNF here... even though really it's a DNF
+ handler.senderRNF();
setStatus(ROUTE_NOT_FOUND);
return;
}
@@ -223,6 +230,7 @@
if(next == null) {
// Backtrack
+ handler.senderRNF();
setStatus(ROUTE_NOT_FOUND);
return;
}
@@ -277,6 +285,7 @@
if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
// Timeout or rejected:overload
Logger.error(this, "Timeout or rejected:overload: "+msg+" for
"+id+" to "+next);
+ handler.senderRejectedOverload();
setStatus(REJECTED_OVERLOAD);
return;
}
@@ -328,7 +337,6 @@
public void setStatus(int newStatus) {
Logger.debug(this, "Setting status to "+newStatus+" on "+this);
status = newStatus;
- handler.statusChange(this, status);
}
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-13 16:20:26 UTC (rev 7430)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-13 17:33:49 UTC (rev 7431)
@@ -29,6 +29,7 @@
final Node node;
final SubscriptionManager manager;
final PublishStreamKey key;
+ final double target;
final Random random;
// Small, rarely modified => []
private ClientSubscriptionHandler[] localSubscribers;
@@ -79,6 +80,7 @@
upstreamRestarting = false;
upstreamRestartUID = -1;
this.random = random;
+ target = key.toNormalizedDouble();
}
public synchronized boolean shouldDrop() {
@@ -122,12 +124,12 @@
data = packetData;
}
- /**
+ /**
* Forward this packet to a subscriber node.
* As an FNPSubscribeData. NOT an FNPPublishData.
* DO NOT CALL WITH LOCKS HELD, as sends packets.
* @param pn Node to send to.
- */
+ */
public void forwardTo(PeerNode pn) throws NotConnectedException {
Message msg = DMT.createFNPSubscribeData(key, packetNumber, data);
pn.sendAsync(msg, null);
@@ -239,13 +241,13 @@
PeerNode pn = sub.subscriber;
if(pn != ignore) {
try {
- item.forwardTo(pn);
+ item.forwardTo(pn);
} catch (NotConnectedException e) {
Logger.minor(this, "Lost connection to subscriber peer
"+pn+" when forwarding packet "+item);
removeSubscriberHandler(sub);
}
+ }
}
- }
ClientSubscriptionHandler[] handlers;
synchronized(this) {
handlers = localSubscribers;
@@ -285,138 +287,189 @@
public void run() {
try {
- Logger.minor(this, "Running "+this);
- PeerNode myParent;
- while(true) {
- synchronized(this) {
- myParent = parent;
- if(restarting) myParent = null;
- if(upstreamRestarting) myParent = null;
- }
- if(myParent == null) {
- // We just became root, or we restarted
- handlePublishData(uid, source, origMessage);
- return;
- }
- // Send upwards
- Message msg = DMT.createFNPPublishData(contents, key,
packetNumber, uid);
- try {
- myParent.sendAsync(msg, null);
- } catch (NotConnectedException e) {
- lostConnection();
- return;
- }
- // Wait for reply
- MessageFilter mfSuccess =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataSucceeded);
- MessageFilter mfRestarting =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataRejectedRestarting);
- MessageFilter mfCollision =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataCollision);
- MessageFilter mfInvalid =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataInvalid);
- MessageFilter mfOverload =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedOverload);
- MessageFilter mfLoop =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedLoop);
-
- MessageFilter mf =
mfSuccess.or(mfRestarting.or(mfCollision.or(mfInvalid.or(mfOverload.or(mfLoop)))));
-
- Message reply;
-
- try {
- reply = source.node.usm.waitFor(mf);
- } catch (DisconnectedException e1) {
- lostConnection();
- return;
- }
-
- if(reply == null && !myParent.isConnected()) {
- Logger.error(this, "Did not get notified of parent
disconnection!");
- lostConnection();
- return;
- }
-
- if(reply == null || reply.getSpec() == DMT.FNPRejectedOverload) {
- // Timeout
- Logger.error(this, "Timeout - "+myParent+" did not respond in
"+TIMEOUT+"ms forwarding PublishData");
- try {
- source.sendAsync(DMT.createFNPRejectedOverload(uid), null);
- } catch (NotConnectedException e1) {
- Logger.normal(this, "Disconnected while sending rejected
due to overload: "+
- source+" ("+key+","+packetNumber+")");
- }
- return;
- }
-
- if(reply.getSpec() == DMT.FNPRejectedLoop) {
- synchronized(this) {
- if(myParent != parent) continue;
- }
- Logger.error(this, "Loop rejection in forwarding PublishData
to parent!");
- // Tree topology is completely broken
- forceSubscribe();
- try {
-
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Disconnected while sending rejection
due to restart due to loop: "+
- source+" ("+key+","+packetNumber+")");
- }
- return;
- }
-
- if(reply.getSpec() == DMT.FNPPublishDataInvalid) {
- // FIXME check it
- try {
- source.sendAsync(DMT.createFNPPublishDataInvalid(uid),
null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Disconnected while sending invalid
data rejection: "+
- source+" ("+key+","+packetNumber+")");
- }
- return;
- }
+ Logger.minor(this, "Running " + this);
+ PeerNode myParent;
+ while (true) {
+ synchronized (this) {
+ myParent = parent;
+ if (restarting)
+ myParent = null;
+ if (upstreamRestarting)
+ myParent = null;
+ }
+ if (myParent == null) {
+ // We just became root, or we
restarted
+ handlePublishData(uid, source,
origMessage);
+ return;
+ }
+ // Send upwards
+ Message msg =
DMT.createFNPPublishData(contents, key,
+ packetNumber, uid);
+ try {
+ myParent.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ lostConnection();
+ return;
+ }
+ // Wait for reply
+ MessageFilter mfSuccess =
MessageFilter.create()
+
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+
.setSource(myParent).setMatchesDroppedConnection(
+
true).setType(DMT.FNPPublishDataSucceeded);
+ MessageFilter mfRestarting =
MessageFilter.create()
+
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+
.setSource(myParent).setMatchesDroppedConnection(
+
true).setType(
+
DMT.FNPPublishDataRejectedRestarting);
+ MessageFilter mfCollision =
MessageFilter.create()
+
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+
.setSource(myParent).setMatchesDroppedConnection(
+
true).setType(DMT.FNPPublishDataCollision);
+ MessageFilter mfInvalid =
MessageFilter.create()
+
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+
.setSource(myParent).setMatchesDroppedConnection(
+
true).setType(DMT.FNPPublishDataInvalid);
+ MessageFilter mfOverload =
MessageFilter.create()
+
.setTimeout(TIMEOUT).setField(DMT.UID, uid)
+
.setSource(myParent).setMatchesDroppedConnection(
+
true).setType(DMT.FNPRejectedOverload);
+ MessageFilter mfLoop =
MessageFilter.create().setTimeout(
+
TIMEOUT).setField(DMT.UID, uid).setSource(myParent)
+
.setMatchesDroppedConnection(true).setType(
+
DMT.FNPRejectedLoop);
- if(reply.getSpec() == DMT.FNPPublishDataRejectedRestarting) {
- // FIXME enforcement - has it recently said it's restarting?
- try {
-
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Disconnected while relaying
rejected:restarting: "+
- source+" ("+key+","+packetNumber+")");
- }
- return;
- }
+ MessageFilter mf =
mfSuccess.or(mfRestarting.or(mfCollision
+
.or(mfInvalid.or(mfOverload.or(mfLoop)))));
- if(reply.getSpec() == DMT.FNPPublishDataCollision) {
- // Yay finally a nontrivial failure mode!
- long suggested = reply.getLong(DMT.SUGGESTED_SEQNO);
- suggested = Math.max(suggested, packets.getLastNumber()+1);
- try {
- source.sendAsync(DMT.createFNPPublishDataCollision(uid,
suggested), null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Not connected while sending
collision: "+
- source+" ("+key+","+packetNumber+")");
- }
- return;
- }
-
- if(reply.getSpec() == DMT.FNPPublishDataSucceeded) {
- // Success!
- PacketItem item = new PacketItem(packetNumber, contents);
- packets.add(item);
- try {
- source.sendAsync(DMT.createFNPPublishDataSucceeded(uid),
null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Disconnected while adding new packet:
"+
- source+" ("+key+","+packetNumber+")");
- }
- broadcast(item, source);
- return;
- }
-
- Logger.error(this, "Unrecognized packet: "+reply);
- return;
- }
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t, t);
- } finally {
- source.node.unlockUID(uid);
- source.node.completed(uid);
- }
+ Message reply;
+
+ try {
+ reply =
source.node.usm.waitFor(mf);
+ } catch (DisconnectedException e1) {
+ lostConnection();
+ return;
+ }
+
+ if (reply == null &&
!myParent.isConnected()) {
+ Logger
+ .error(this,
+
"Did not get notified of parent disconnection!");
+ lostConnection();
+ return;
+ }
+
+ if (reply == null
+ || reply.getSpec() ==
DMT.FNPRejectedOverload) {
+ // Timeout
+ Logger.error(this, "Timeout - "
+ myParent
+ + " did not
respond in " + TIMEOUT
+ + "ms
forwarding PublishData");
+ try {
+ source.sendAsync(
+
DMT.createFNPRejectedOverload(uid), null);
+ } catch (NotConnectedException
e1) {
+ Logger.normal(this,
+
"Disconnected while sending rejected due to overload: "
+
+ source + " (" + key + ","
+
+ packetNumber + ")");
+ }
+ return;
+ }
+
+ if (reply.getSpec() ==
DMT.FNPRejectedLoop) {
+ synchronized (this) {
+ if (myParent != parent)
+ continue;
+ }
+ Logger.error(this,
+
"Loop rejection in forwarding PublishData to parent!");
+ // Tree topology is completely
broken
+ forceSubscribe();
+ try {
+
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid),
+
null);
+ } catch (NotConnectedException
e) {
+ Logger.normal(this,
+
"Disconnected while sending rejection due to restart due to loop: "
+
+ source + " (" + key + ","
+
+ packetNumber + ")");
+ }
+ return;
+ }
+
+ if (reply.getSpec() ==
DMT.FNPPublishDataInvalid) {
+ // FIXME check it
+ try {
+ source.sendAsync(DMT
+
.createFNPPublishDataInvalid(uid), null);
+ } catch (NotConnectedException
e) {
+ Logger.normal(this,
+
"Disconnected while sending invalid data rejection: "
+
+ source + " (" + key + ","
+
+ packetNumber + ")");
+ }
+ return;
+ }
+
+ if (reply.getSpec() ==
DMT.FNPPublishDataRejectedRestarting) {
+ // FIXME enforcement - has it
recently said it's
+ // restarting?
+ try {
+ source
+
.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid),
+
null);
+ } catch (NotConnectedException
e) {
+ Logger.normal(this,
+
"Disconnected while relaying rejected:restarting: "
+
+ source + " (" + key + ","
+
+ packetNumber + ")");
+ }
+ return;
+ }
+
+ if (reply.getSpec() ==
DMT.FNPPublishDataCollision) {
+ // Yay finally a nontrivial
failure mode!
+ long suggested =
reply.getLong(DMT.SUGGESTED_SEQNO);
+ suggested = Math.max(suggested,
+
packets.getLastNumber() + 1);
+ try {
+
source.sendAsync(DMT.createFNPPublishDataCollision(
+ uid,
suggested), null);
+ } catch (NotConnectedException
e) {
+ Logger.normal(this,
+ "Not
connected while sending collision: "
+
+ source + " (" + key + ","
+
+ packetNumber + ")");
+ }
+ return;
+ }
+
+ if (reply.getSpec() ==
DMT.FNPPublishDataSucceeded) {
+ // Success!
+ PacketItem item = new
PacketItem(packetNumber, contents);
+ packets.add(item);
+ try {
+ source.sendAsync(DMT
+
.createFNPPublishDataSucceeded(uid), null);
+ } catch (NotConnectedException
e) {
+ Logger.normal(this,
+
"Disconnected while adding new packet: "
+
+ source + " (" + key + ","
+
+ packetNumber + ")");
+ }
+ broadcast(item, source);
+ return;
+ }
+
+ Logger.error(this, "Unrecognized
packet: " + reply);
+ return;
+ }
+ } catch (Throwable t) {
+ Logger.error(this, "Caught " + t, t);
+ } finally {
+ source.node.unlockUID(uid);
+ source.node.completed(uid);
+ }
}
private void lostConnection() {
@@ -431,14 +484,16 @@
}
- /**
- * Handle a SubscribeRequest. Do not route it. Add the
- * node to the list of subscriber nodes and send it our status.
- * Caller should have checked ID for loops.
- * @param m The message to reply to.
- * @throws DroppingSubscriptionHandlerException If the node is currently
being dropped
- * from the parent.
- */
+ /**
+ * Handle a SubscribeRequest. Do not route it. Add the node to the list
+ * of subscriber nodes and send it our status. Caller should have
+ * checked ID for loops.
+ *
+ * @param m
+ * The message to reply to.
+ * @throws DroppingSubscriptionHandlerException
+ * If the node is currently being dropped from the parent.
+ */
public void handleSubscribeRequest(Message m) throws
DroppingSubscriptionHandlerException {
long id = m.getLong(DMT.UID);
short htl = m.getShort(DMT.HTL);
@@ -451,7 +506,6 @@
double origNearest = m.getDouble(DMT.NEAREST_LOCATION);
double nearest = origNearest;
double myLoc = source.node.lm.getLocation().getValue();
- double target = key.toNormalizedDouble();
if(Math.abs(myLoc-target) < Math.abs(nearest-target)) {
htl = Node.MAX_HTL;
nearest = myLoc;
@@ -460,6 +514,7 @@
}
boolean nowRestarting = false;
+ boolean nowRejectingOrigCloserThanRoot = false;
SubscribeHandler handler;
try {
synchronized(this) {
@@ -475,6 +530,14 @@
// Will tell client node current status
handler = new SubscribeHandler(node, id, htl, source,
orderedMessagesSeq, origNearest, nearest, this, items);
+ if(subscribed) {
+ if(Math.abs(target - rootLocation) > Math.abs(target -
origNearest)) {
+ Logger.normal(this, "Rejecting subscription:
our root "+rootLocation+" is further from target "+target+
+ " than their nearest-so-far:
"+origNearest);
+ nowRejectingOrigCloserThanRoot = true;
+ }
+ }
+
if((!subscribed) && (!restarting)) {
restarting = true;
nowRestarting = true;
@@ -490,6 +553,19 @@
}
}
+ if(nowRejectingOrigCloserThanRoot) {
+ // htl 0 because of root
+ Message msg = DMT.createFNPRouteNotFound(id, (short)0);
+ try {
+ source.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ // Will not happen often!
+ Logger.minor(this, "Rejected because orig
closer than root, but conn closed on "+id+" for "+this);
+ }
+ // There *may* be a more optimal route somewhere...
+ maybeRestart();
+ }
+
try {
if(items != null && items.length > 0) {
for(int i=0;i<items.length;i++) {
@@ -642,19 +718,9 @@
}
}
- /**
- * Callback from SubscribeSender, indicating a change in status.
- * @param sender2
- * @param status
- */
- void statusChange(SubscribeSender sender2, int status) {
- // TODO Auto-generated method stub
-
- }
-
synchronized long getRestartUID() {
return restartUID;
- }
+ }
public synchronized double targetLocation() {
return key.toNormalizedDouble();
@@ -689,4 +755,37 @@
public long getLastSeqNum() {
return packets.getLastContiguousSeqNum();
}
+
+ /**
+ * A SubscribeRequest succeeded.
+ * @param next The node we are now subscribed through.
+ * @param rootLoc The tree root's location.
+ */
+ public void subscribeSucceeded(PeerNode next, double rootLoc) {
+ SubscribeHandler[] handlers;
+ synchronized(this) {
+ parent = next;
+ subscribed = true;
+ rootLocation = rootLoc;
+ handlers = subscriberPeerHandlers;
+ for(int i=0;i<handlers.length;i++) {
+ SubscribeHandler handler = handlers[i];
+ if(Math.abs(handler.origNearestLoc-target) <
Math.abs(rootLoc-target)) {
+ // htl 0 because of root
+ Message msg =
DMT.createFNPRouteNotFound(handler.origID, (short)0);
+ try {
+
handler.subscriber.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ // Will not happen often!
+ Logger.minor(this, "Rejected
because orig closer than root, but conn closed on "+handler.origID+" for
"+this);
+ }
+ removeSubscriberHandler(handler);
+ continue;
+ } else {
+ handler.sendStatusMessage(true,
rootLoc);
+ // Stays on list because is subscribed
+ }
+ }
+ }
+ }
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs