Author: toad
Date: 2005-10-12 19:50:08 +0000 (Wed, 12 Oct 2005)
New Revision: 7425
Modified:
branches/publish-subscribe/freenet/.classpath
branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java
branches/publish-subscribe/freenet/src/freenet/node/Node.java
branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java
branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java
branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java
Log:
Current status of publish/subscribe implementation, on a branch. Does not build!
The main freenet 0.7 repository is trunk/freenet, which does build.
Modified: branches/publish-subscribe/freenet/.classpath
===================================================================
--- branches/publish-subscribe/freenet/.classpath 2005-10-12 18:56:58 UTC
(rev 7424)
+++ branches/publish-subscribe/freenet/.classpath 2005-10-12 19:50:08 UTC
(rev 7425)
@@ -1,6 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry
excluding="test/**|org/spaceroots/mantissa/random/MersenneTwisterTest.java"
kind="src" path="src"/>
+ <classpathentry
excluding="org/spaceroots/mantissa/random/MersenneTwisterTest.java|test/*"
kind="src" path="src"/>
<classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry combineaccessrules="false" kind="src" path="/Contrib"/>
+ <classpathentry kind="lib" path="/usr/src/cvs/junit3.8.1.jar"/>
+ <classpathentry kind="lib"
path="/usr/src/cvs/freenet-stable/lib/freenet-ext.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -79,6 +79,12 @@
public static final String BLOCK_HEADERS = "blockHeaders";
public static final String DATA_INSERT_REJECTED_REASON =
"dataInsertRejectedReason";
public static final String STREAM_SEQNO = "streamSequenceNumber";
+ public static final String LAST_SEQNO = "lastSequenceNumber";
+ public static final String SUGGESTED_SEQNO = "suggestedSequenceNumber";
+ public static final String RESTART_UID = "restartUID";
+ public static final String MUST_BEAT_LOCATION = "mustBeatLocation";
+ public static final String ROOT_LOCATION = "rootLocation";
+ public static final String ORDERED_MESSAGE_NUM = "orderedMessageNum";
//Diagnostic
public static final MessageType ping = new MessageType("ping") {{
@@ -778,21 +784,17 @@
public static final MessageType FNPPublishData = new
MessageType("FNPPublishData") {{
addField(UID, Long.class);
- addField(HTL, Short.class);
addField(DATA, ShortBuffer.class);
addField(KEY, PublishStreamKey.class);
addField(STREAM_SEQNO, Long.class);
- addField(NEAREST_LOCATION, Double.class);
}};
- public static final Message createFNPPublishData(short htl, byte[] data,
PublishStreamKey key, long seqNo, long id, double nearestLoc) {
+ public static final Message createFNPPublishData(byte[] data,
PublishStreamKey key, long seqNo, long id) {
Message msg = new Message(FNPPublishData);
- msg.set(HTL, htl);
msg.set(KEY, key);
msg.set(DATA, new ShortBuffer(data));
msg.set(STREAM_SEQNO, seqNo);
msg.set(UID, id);
- msg.set(NEAREST_LOCATION, nearestLoc);
return msg;
}
@@ -830,6 +832,120 @@
return msg;
}
+ public static final MessageType FNPPublishDataRejectedRestarting = new
MessageType("FNPPublishDataRejectedRestarting") {{
+ addField(UID, Long.class);
+ }};
+
+ public static final Message createFNPPublishDataRejectedRestarting(long
id) {
+ Message msg = new Message(FNPPublishDataRejectedRestarting);
+ msg.set(UID, id);
+ return msg;
+ }
+
+ public static final MessageType FNPPublishDataNotSubscribed = new
MessageType("FNPPublishDataNotSubscribed") {{
+ addField(UID, Long.class);
+ }};
+
+ public static final Message createFNPPublishDataNotSubscribed(long uid) {
+ Message msg = new Message(FNPPublishDataNotSubscribed);
+ msg.set(UID, uid);
+ return msg;
+ }
+
+ public static final MessageType FNPSubscribeRequest = new
MessageType("FNPSubscribeRequest") {{
+ addField(UID, Long.class);
+ addField(HTL, Short.class);
+ addField(KEY, PublishStreamKey.class);
+ addField(LAST_SEQNO, Long.class);
+ addField(NEAREST_LOCATION, Double.class);
+ addField(ORDERED_MESSAGE_NUM, Integer.class);
+ }};
+
+ public static final Message createFNPSubscribeRequest(long uid, short htl,
PublishStreamKey key, long lastSeqNum, double nearestSoFar, int orderedSeqNo) {
+ Message msg = new Message(FNPSubscribeRequest);
+ msg.set(UID, uid);
+ msg.set(HTL, htl);
+ msg.set(KEY, key);
+ msg.set(LAST_SEQNO, lastSeqNum);
+ msg.set(NEAREST_LOCATION, nearestSoFar);
+ msg.set(ORDERED_MESSAGE_NUM, orderedSeqNo);
+ return msg;
+ }
+
+ public static final MessageType FNPResubscribeRequest = new
MessageType("FNPResubscribeRequest") {{
+ addField(RESTART_UID, Long.class);
+ addField(HTL, Short.class);
+ addField(KEY, PublishStreamKey.class);
+ addField(LAST_SEQNO, Long.class);
+ addField(NEAREST_LOCATION, Double.class);
+ addField(MUST_BEAT_LOCATION, Double.class);
+ }};
+
+ public static final Message createFNPResubscribeRequest(long restartUID,
short htl, PublishStreamKey key, long lastSeqNum, double nearestSoFar, double
mustBeatLocation) {
+ Message msg = new Message(FNPResubscribeRequest);
+ msg.set(RESTART_UID, restartUID);
+ msg.set(HTL, htl);
+ msg.set(KEY, key);
+ msg.set(LAST_SEQNO, lastSeqNum);
+ msg.set(NEAREST_LOCATION, nearestSoFar);
+ msg.set(MUST_BEAT_LOCATION, mustBeatLocation);
+ return msg;
+ }
+
+ public static final MessageType FNPSubscribeRestarted = new
MessageType("FNPSubscribeRestarted") {{
+ addField(UID, Long.class); // if it is a reply to a SubscribeRequest,
we need to be able to identify it,
+ // if only because of possible race conditions.
+
+ addField(RESTART_UID, Long.class);
+ addField(ORDERED_MESSAGE_NUM, Integer.class);
+ }};
+
+ public static final Message createFNPSubscribeRestarted(long uid, long
restartUID, int orderedMessageNum) {
+ Message msg = new Message(FNPSubscribeRestarted);
+ msg.set(UID, uid);
+ msg.set(RESTART_UID, restartUID);
+ msg.set(MUST_BEAT_LOCATION, orderedMessageNum);
+ return msg;
+ }
+
+ public static final MessageType FNPSubscribeSucceeded = new
MessageType("FNPSubscribeSucceeded") {{
+ addField(ORDERED_MESSAGE_NUM, Integer.class);
+ addField(UID, Long.class);
+ addField(ROOT_LOCATION, Double.class);
+ }};
+
+ public static final Message createFNPSubscribeSucceeded(long uid, int
orderedMessageNum, double rootLoc) {
+ Message msg = new Message(FNPSubscribeSucceeded);
+ msg.set(UID, uid);
+ msg.set(ORDERED_MESSAGE_NUM, orderedMessageNum);
+ msg.set(ROOT_LOCATION, rootLoc);
+ return msg;
+ }
+
+ public static final MessageType FNPSubscribeSucceededNewRoot = new
MessageType("FNPSubscribeSucceededNewRoot") {{
+ addField(ORDERED_MESSAGE_NUM, Integer.class);
+ addField(UID, Long.class);
+ }};
+
+ public static final Message createFNPSubscribeSucceededNewRoot(long uid,
PublishStreamKey key, int orderedMessageNum) {
+ Message msg = new Message(FNPSubscribeSucceeded);
+ msg.set(UID, uid);
+ msg.set(ORDERED_MESSAGE_NUM, orderedMessageNum);
+ return msg;
+ }
+
+ public static final MessageType FNPPublishDataCollision = new
MessageType("FNPPublishDataCollision") {{
+ addField(UID, Long.class);
+ addField(SUGGESTED_SEQNO, Long.class);
+ }};
+
+ public static final Message createFNPPublishDataCollision(long id, long
seqNo) {
+ Message msg = new Message(FNPPublishDataCollision);
+ msg.set(UID, id);
+ msg.set(SUGGESTED_SEQNO, seqNo);
+ return msg;
+ }
+
// public static final MessageType FNPSubscribeRequest = new
MessageType("FNPSubscribeRequest") {{
// addField(UID, Long.class);
// addField(HTL, Short.class);
Modified:
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
2005-10-12 18:56:58 UTC (rev 7424)
+++
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -14,13 +14,13 @@
final ClientPublishStreamKey key;
- final SubscriptionManager sm;
+ final SubscriptionHandler handler;
boolean finished = false;
private final LinkedList clientSubs;
- ClientSubscriptionHandler(SubscriptionManager manager,
ClientPublishStreamKey key) {
+ ClientSubscriptionHandler(SubscriptionHandler sh, ClientPublishStreamKey
key) {
clientSubs = new LinkedList();
- sm = manager;
+ handler = sh;
this.key = key;
}
@@ -34,7 +34,7 @@
clientSubs.remove(sub);
if(clientSubs.size() == 0) {
finished = true;
- sm.remove(this);
+ handler.remove(this);
}
}
Modified: branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/InsertSender.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -27,6 +27,15 @@
this.prb = prb;
this.fromStore = fromStore;
this.closestLocation = closestLocation;
+
+ double myLoc = node.lm.getLocation().getValue();
+ if(Math.abs(target-myLoc) < Math.abs(target-closestLocation)) {
+ closestLocation = myLoc;
+ htl = Node.MAX_HTL;
+ } else {
+ htl = node.decrementHTL(source, htl);
+ }
+
Thread t = new Thread(this, "InsertSender for UID "+uid+" on
"+node.portNumber);
t.setDaemon(true);
t.start();
@@ -76,13 +85,8 @@
// Route it
PeerNode next;
// Can backtrack, so only route to nodes closer than we are to
target.
- double nextValue;
synchronized(node.peers) {
next = node.peers.closerPeer(source, nodesRoutedTo, target,
true);
- if(next != null)
- nextValue = next.getLocation().getValue();
- else
- nextValue = -1.0;
}
if(next == null) {
@@ -93,11 +97,6 @@
Logger.minor(this, "Routing insert to "+next);
nodesRoutedTo.add(next);
- if(Math.abs(target - nextValue) > Math.abs(target -
closestLocation)) {
- Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+closestLocation);
- htl = node.decrementHTL(source, htl);
- }
-
Message req = DMT.createFNPInsertRequest(uid, htl, myKey,
closestLocation);
// Wait for ack or reject... will come before even a locally
generated DataReply
@@ -132,6 +131,7 @@
}
if(msg.getSpec() == DMT.FNPRejectedLoop) {
+ htl = node.decrementHTL(source, htl);
// Loop - we don't want to send the data to this one
continue;
}
@@ -208,6 +208,7 @@
if(msg.getSpec() == DMT.FNPRouteNotFound) {
Logger.minor(this, "Rejected: RNF");
// Still gets the data - but not yet
+ htl = node.decrementHTL(source, htl);
short newHtl = msg.getShort(DMT.HTL);
if(htl > newHtl) htl = newHtl;
continue;
Modified: branches/publish-subscribe/freenet/src/freenet/node/Node.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/Node.java
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/Node.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -57,8 +57,8 @@
public static final int PACKETS_IN_BLOCK = 32;
public static final int PACKET_SIZE = 1024;
- public static final double DECREMENT_AT_MIN_PROB = 0.2;
- public static final double DECREMENT_AT_MAX_PROB = 0.1;
+ public static final double DECREMENT_AT_MIN_PROB = 0.25;
+ public static final double DECREMENT_AT_MAX_PROB = 0.5;
// Send keepalives every 2.5-5.0 seconds
public static final int KEEPALIVE_INTERVAL = 2500;
// If no activity for 15 seconds, node is dead
@@ -96,6 +96,8 @@
private final Hashtable localStreamContexts;
private final HashSet runningUIDs;
+ final LRUQueue recentlyCompletedIDs;
+ static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;
byte[] myIdentity; // FIXME: simple identity block; should be unique
/** Hash of identity. Used as setup key. */
@@ -111,7 +113,7 @@
final PacketSender ps;
final NodeDispatcher dispatcher;
final String filenamesPrefix;
- static short MAX_HTL = 10;
+ static short MAX_HTL = 5;
private static final int EXIT_STORE_FILE_NOT_FOUND = 1;
private static final int EXIT_STORE_IOEXCEPTION = 2;
private static final int EXIT_STORE_OTHER = 3;
@@ -674,9 +676,6 @@
// FIXME support compression when noderefs get big enough for it to be
useful
}
- final LRUQueue recentlyCompletedIDs;
- static final int MAX_RECENTLY_COMPLETED_IDS = 10*1000;
-
/**
* Has a request completed with this ID recently?
*/
Modified:
branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/NodeDispatcher.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -74,27 +74,18 @@
return handleDataRequest(m);
} else if(spec == DMT.FNPInsertRequest) {
return handleInsertRequest(m);
+ } else if(spec == DMT.FNPSubscribeRequest) {
+ return node.subscriptions.handleSubscribeRequest(m);
+ } else if(spec == DMT.FNPResubscribeRequest) {
+ return node.subscriptions.handleResubscribeRequest(m);
} else if(spec == DMT.FNPPublishData) {
- return handlePublishData(m);
+ return node.subscriptions.handlePublishData(m);
}
// } // SubscribeData, SubscribeRestarted etc handled by
SubscribeSender.
return false;
}
/**
- * Handle an FNPPublishData message.
- * @return False to put it back onto the queue.
- */
- private boolean handlePublishData(Message m) {
- // Create a PublishSender. This will do all the work.
- // FIXME maybe we should check whether we've sent the packet before?
- // It's not really a viable DoS but it is good practice...
- PublishHandlerSender ps =
- node.makePublishHandlerSender(m);
- return true;
- }
-
- /**
* Handle an incoming FNPDataRequest.
*/
private boolean handleDataRequest(Message m) {
Modified: branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/PacketSender.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -51,13 +51,14 @@
continue;
}
- // Any urgent notifications to send?
- long urgentTime = pn.getNextUrgentTime();
- if(urgentTime <= now) {
- // Send them
- pn.sendAnyUrgentNotifications();
- } else {
- nextActionTime = Math.min(nextActionTime,
urgentTime);
+ if(node.packetMangler == null) continue;
+ // Any messages to send?
+ MessageItem[] messages = null;
+ messages = pn.grabQueuedMessageItems();
+ if(messages != null) {
+ // Send packets, right now, blocking, including
any active notifications
+
node.packetMangler.processOutgoingOrRequeue(messages, pn, true);
+ continue;
}
// Any packets to resend?
@@ -88,14 +89,13 @@
}
- if(node.packetMangler == null) continue;
- // Any messages to send?
- MessageItem[] messages = null;
- messages = pn.grabQueuedMessageItems();
- if(messages != null) {
- // Send packets, right now, blocking, including
any active notifications
-
node.packetMangler.processOutgoingOrRequeue(messages, pn, true);
- continue;
+ // Any urgent notifications to send?
+ long urgentTime = pn.getNextUrgentTime();
+ if(urgentTime <= now) {
+ // Send them
+ pn.sendAnyUrgentNotifications();
+ } else {
+ nextActionTime = Math.min(nextActionTime,
urgentTime);
}
// Need to send a keepalive packet?
Modified:
branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java
2005-10-12 18:56:58 UTC (rev 7424)
+++
branches/publish-subscribe/freenet/src/freenet/node/PublishHandlerSender.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -78,8 +78,14 @@
closestLocation = m.getDouble(DMT.NEAREST_LOCATION);
htl = m.getShort(DMT.HTL);
double myLoc = n.lm.getLocation().getValue();
- if(Math.abs(myLoc - target) < Math.abs(closestLocation - target))
+ if(Math.abs(myLoc - target) < Math.abs(closestLocation - target)) {
closestLocation = myLoc;
+ // got new closest location; if reset location, reset HTL
+ htl = Node.MAX_HTL;
+ } else {
+ // backtracking
+ htl = node.decrementHTL(source, htl);
+ }
t = new Thread(this);
t.setDaemon(true);
t.start();
@@ -92,7 +98,7 @@
// Copy it as it will be decrypted in place
byte[] packetDataCopy = new byte[packetData.length];
System.arraycopy(packetData, 0, packetDataCopy, 0,
packetData.length);
- node.subscriptions.receivedPacket(key, packetNumber,
packetDataCopy, source);
+ //node.subscriptions.receivedPacket(key, packetNumber,
packetDataCopy, source);
short origHTL = htl;
if(source != null) {
Message msg = DMT.createFNPAccepted(uid);
@@ -152,11 +158,6 @@
Logger.minor(this, "Routing insert to "+next);
nodesRoutedTo.add(next);
- if(Math.abs(target - nextValue) > Math.abs(target -
closestLocation)) {
- Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+closestLocation);
- htl = node.decrementHTL(source, htl);
- }
-
// Now we send it
/**
* Possible responses:
@@ -165,7 +166,7 @@
* RejectedLoop
* (timeout)
*/
- Message msg = DMT.createFNPPublishData(htl, packetData, key,
packetNumber, uid, closestLocation);
+ Message msg = DMT.createFNPPublishData(packetData, key,
packetNumber, uid);
MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
Modified: branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java
2005-10-12 18:56:58 UTC (rev 7424)
+++ branches/publish-subscribe/freenet/src/freenet/node/RequestSender.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -74,6 +74,14 @@
this.nearestLoc = nearestLoc;
target = key.toNormalizedDouble();
+ double myLoc = node.lm.getLocation().getValue();
+ if(Math.abs(myLoc-target) < Math.abs(nearestLoc-target)) {
+ nearestLoc = myLoc;
+ htl = Node.MAX_HTL;
+ } else {
+ htl = node.decrementHTL(source, htl);
+ }
+
Thread t = new Thread(this, "RequestSender for UID "+uid);
t.setDaemon(true);
t.start();
@@ -87,22 +95,14 @@
while(true) {
Logger.minor(this, "htl="+htl);
if(htl == 0) {
- // RNF
- // Would be DNF if arrived with no HTL
- // But here we've already routed it and that's been rejected.
- finish(ROUTE_NOT_FOUND);
+ finish(DATA_NOT_FOUND);
return;
}
// Route it
PeerNode next;
- double nextValue;
synchronized(node.peers) {
next = node.peers.closerPeer(source, nodesRoutedTo, target,
true);
- if(next != null)
- nextValue = next.getLocation().getValue();
- else
- nextValue = -1.0;
}
if(next == null) {
@@ -113,11 +113,6 @@
Logger.minor(this, "Routing insert to "+next);
nodesRoutedTo.add(next);
- if(Math.abs(target - nextValue) > Math.abs(target - nearestLoc)) {
- Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+nearestLoc);
- htl = node.decrementHTL(source, htl);
- }
-
Message req = DMT.createFNPDataRequest(uid, htl, key, nearestLoc);
/**
@@ -155,6 +150,7 @@
}
if(msg.getSpec() == DMT.FNPRejectedLoop) {
+ htl = node.decrementHTL(source, htl);
// Find another node to route to
continue;
}
@@ -195,9 +191,15 @@
}
if(msg.getSpec() == DMT.FNPRouteNotFound) {
- // Backtrack within available hops
- short newHtl = msg.getShort(DMT.HTL);
- if(newHtl < htl) htl = newHtl;
+ // Backtrack within available hops.
+ // We DO NOT include a nearestLoc on an RNF.
+ // The reason for this is that it would suck,
+ // as we *probably* haven't actually gotten any
+ // closer to the target.
+ short newHTL = msg.getShort(DMT.HTL);
+ short oldHTL = node.decrementHTL(source, htl);
+ htl = oldHTL;
+ if(newHTL < oldHTL) htl = newHTL;
continue;
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-12 18:56:58 UTC (rev 7424)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -1,9 +1,20 @@
package freenet.node;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Random;
+
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.keys.ClientPublishStreamKey;
import freenet.keys.PublishStreamKey;
import freenet.support.Logger;
import freenet.support.NumberedItem;
import freenet.support.NumberedRecentItems;
+import freenet.support.ShortBuffer;
/**
* A single subscription.
@@ -14,108 +25,668 @@
static final int KEEP_PACKETS = 32;
+ /** Our Node */
+ final Node node;
+ final SubscriptionManager manager;
final PublishStreamKey key;
- ClientSubscriptionHandler localSubscribers;
- PeerNode[] subscriberPeers;
- final NumberedRecentItems packets;
+ final Random random;
+ // Small, rarely modified => []
+ private ClientSubscriptionHandler[] localSubscribers;
+ /** Nodes which have subscribed to the stream through us, whether
+ * or not we are currently subscribed. Also records the HTL, ID
+ * etc they used, so that if one subscription fails, we can try
+ * the next.
+ */
+ private SubscribeHandler[] subscriberPeerHandlers;
+ private PeerNode parent;
+ private final NumberedRecentItems packets;
+ /** At most one concurrent SubscribeRequest being routed */
+ private SubscribeSender sender;
+ /** Are we currently subscribed? */
+ private boolean subscribed;
+ /** Are we currently restarting? */
+ private boolean restarting;
+ /** Our restart UID */
+ private long restartUID;
+ /** Is upstream restarting? */
+ private boolean upstreamRestarting;
+ /** Upstream's restart UID */
+ private long upstreamRestartUID;
+ /** Removing from parent? */
+ private boolean dropping = false;
+ /** Location of root (root = self, if we are root) */
+ private double rootLocation;
- public SubscriptionHandler(PublishStreamKey k) {
+ /**
+ * On creation, we set restarting = true,
+ * but we do not actually start routing a SubscribeRequest,
+ * because we do not yet have the parameters to do so.
+ * We expect to be fed them by either a call to subscribe(), or
+ * a call to handleSubscriptionRequest().
+ * @param k
+ */
+ SubscriptionHandler(SubscriptionManager mangler, PublishStreamKey k,
Random random) {
key = k;
- subscriberPeers = null;
+ node = mangler.node;
+ manager = mangler;
+ subscriberPeerHandlers = null;
+ parent = null;
packets = new NumberedRecentItems(KEEP_PACKETS, true);
+ sender = null;
+ subscribed = false;
+ restarting = false;
+ // Will be enabled when we actually start restarting
+ upstreamRestarting = false;
+ upstreamRestartUID = -1;
+ this.random = random;
}
+ public synchronized boolean shouldDrop() {
+ return (subscriberPeerHandlers == null ||
subscriberPeerHandlers.length == 0) &&
+ (localSubscribers == null || localSubscribers.length == 0);
+ }
+
/**
- * Set the local subscribers handler.
- * @return The previous local subscribers handler - should be null!
+ * Remove a node from the list of subscribers.
+ * Can be called while dropping.
+ * @param pn The node to remove.
*/
- public synchronized ClientSubscriptionHandler
setLocal(ClientSubscriptionHandler csh) {
- ClientSubscriptionHandler h = localSubscribers;
- localSubscribers = csh;
- return h;
+ void removeSubscriberHandler(SubscribeHandler handler) {
+ Logger.minor(this, "Removing subscriber node "+handler+" from "+this);
+ synchronized(this) {
+ if(subscriberPeerHandlers == null) return;
+ int count = 0;
+ for(int i=0;i<subscriberPeerHandlers.length;i++) {
+ if(!(subscriberPeerHandlers[i] == handler ||
subscriberPeerHandlers[i] == null))
+ count++;
+ }
+ SubscribeHandler[] newPeers = new SubscribeHandler[count];
+ int x=0;
+ for(int i=0;i<subscriberPeerHandlers.length;i++) {
+ if(!(subscriberPeerHandlers[i] == handler ||
subscriberPeerHandlers[i] == null))
+ newPeers[x++] = subscriberPeerHandlers[i];
+ }
+ subscriberPeerHandlers = newPeers;
+ if(!shouldDrop()) return;
+ }
+ drop();
}
- public synchronized boolean shouldDrop() {
- return subscriberPeersEmpty() && localSubscribers == null;
+ public class PacketItem implements NumberedItem {
+
+ long packetNumber;
+ byte[] data;
+
+ public PacketItem(long packetNumber, byte[] packetData) {
+ this.packetNumber = packetNumber;
+ data = packetData;
}
/**
- * @return True if there are no subscriber peers.
+ * Forward this packet to a subscriber node.
+ * As an FNPSubscribeData. NOT an FNPPublishData.
+ * DO NOT CALL WITH LOCKS HELD, as sends packets.
+ * @param pn Node to send to.
*/
- private final boolean subscriberPeersEmpty() {
- PeerNode[] peers = subscriberPeers;
- return (peers == null || peers.length == 0);
+ public void forwardTo(PeerNode pn) throws NotConnectedException {
+ Message msg = DMT.createFNPSubscribeData(key, packetNumber, data);
+ pn.sendAsync(msg, null);
}
- public void addSubscriberNode(PeerNode pn, long lastSeen) {
+ public long getNumber() {
+ return packetNumber;
+ }
+
+ }
+
+ /**
+ * Handle a PublishData packet:
+ * <li>If we have the same packet, with the
+ * same contents, return FNPPublishSucceeded.</li>
+ * <li>If we have the same packet with
+ * different contents, return FNPPublishCollision with a suggested
+ * next packet number.</li>
+ * <li>If we are not the root, forward to our parent, and wait
+ * for FNPPublishSucceeded or FNPPublishCollision.</li>
+ * <li>If we are the root, and we don't have that packet number,
+ * accept it; FNPPublishSucceeded. Then broadcast it to all other
+ * nodes on the tree.</li>
+ * @return True unless we want the message to be put back
+ * onto the queue.
+ */
+ public boolean handlePublishData(long id, PeerNode source, Message m)
throws DroppingSubscriptionHandlerException {
+ long packetNumber = m.getLong(DMT.PACKET_NO);
+ byte[] data = ((ShortBuffer) m.getObject(DMT.DATA)).getData();
+ PacketItem item;
+ boolean isRoot;
+ boolean isRestarting;
+ PacketItem myItem = null;
synchronized(this) {
- if(subscriberPeers == null || subscriberPeers.length == 0) {
- subscriberPeers = new PeerNode[] { pn };
+ if(dropping) throw new DroppingSubscriptionHandlerException();
+ item = (PacketItem) packets.get(packetNumber);
+ isRoot = isRoot();
+ isRestarting = restarting || upstreamRestarting;
+ if(isRoot && item == null) {
+ myItem = new PacketItem(packetNumber, data);
+ packets.add(myItem);
+ }
+ }
+ if(item != null) {
+ // Equal contents?
+ if(Arrays.equals(item.data, data)) {
+ try {
+ // Already have the packet; success
+ source.sendAsync(DMT.createFNPPublishDataSucceeded(id),
null);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Not connected while sending success
because already have packet to "+
+ source+" ("+key+","+packetNumber+")");
+ }
} else {
- PeerNode[] peers = new PeerNode[subscriberPeers.length+1];
- System.arraycopy(subscriberPeers, 0, peers, 0,
subscriberPeers.length);
- peers[peers.length-1] = pn;
- subscriberPeers = peers;
+ // Collision
+ try {
+ source.sendAsync(DMT.createFNPPublishDataCollision(id,
packets.getLastNumber()+1), null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Not connected while sending
collision: "+
+ source+" ("+key+","+packetNumber+")");
}
}
- NumberedItem[] items = packets.getAfter(lastSeen);
- if(items == null || items.length == 0) return;
- for(int i=0;i<items.length;i++) {
- PacketItem item = (PacketItem)items[i];
+ } else {
+ if(isRestarting) {
+ try {
+
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(id), null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected while sending rejection
due to restart: "+
+ source+" ("+key+","+packetNumber+")");
+ }
+ } else if(isRoot) {
+ // Added new item already, success
+ try {
+ source.sendAsync(DMT.createFNPPublishDataSucceeded(id),
null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected while adding new packet:
"+
+ source+" ("+key+","+packetNumber+")");
+ }
+ // Send to all other nodes; those below source will send when
get success message
+ broadcast(myItem, source);
+ } else {
+ // Forward to parent and wait for success/failure - off thread!
+ ForwardPublishUpTree forwarder = new ForwardPublishUpTree(id,
source, packetNumber, data, m);
+ Thread t = new Thread(forwarder);
+ t.setDaemon(true); // FIXME should this block exit?
+ t.start();
+ return true; // don't unlockUID yet
+ }
+ }
+ // In all cases...
+ source.node.unlockUID(id);
+ return true;
+ }
+
+ /**
+ * Forward a PacketItem to all subscriber peers, except for the one given.
+ * The item must already have been added to the packets cache.
+ * Also sends to any local subscribers.
+ */
+ private void broadcast(PacketItem item, PeerNode ignore) {
+ SubscribeHandler[] peers;
+ synchronized(this) {
+ peers = subscriberPeerHandlers;
+ }
+ if(peers == null) return;
+ for(int i=0;i<peers.length;i++) {
+ SubscribeHandler sub = peers[i];
+ if(sub == null) continue;
+ PeerNode pn = sub.subscriber;
+ if(pn != ignore) {
+ try {
item.forwardTo(pn);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Lost connection to subscriber peer
"+pn+" when forwarding packet "+item);
+ removeSubscriberHandler(sub);
+ }
}
}
+ ClientSubscriptionHandler[] handlers;
+ synchronized(this) {
+ handlers = localSubscribers;
+ }
+ if(handlers != null) {
+ for(int i=0;i<handlers.length;i++)
+ handlers[i].processPacket(item.packetNumber, item.data);
+ }
+ }
+
+ private synchronized boolean isRoot() {
+ return parent == null && !restarting;
+ }
/**
- * Process an incoming PublishData packet.
+ * Forward a PublishData to our parent.
+ * Wait for a PublishDataSucceeded or a PublishDataCollision.
+ * If the former, broadcast. If the latter, return to sender.
*/
- public void processPacket(long packetNumber, byte[] packetData, PeerNode
source) {
- // First, have we seen it before?
- PacketItem item = new PacketItem(packetNumber, packetData);
- if(!packets.add(item)) {
- Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+"
twice");
+ private class ForwardPublishUpTree implements Runnable {
+
+ final long uid;
+ final long packetNumber;
+ final PeerNode source;
+ final byte[] contents;
+ final Message origMessage;
+ // Yet another arbitrary timeout. Should be plenty considering these
are supposed to be real time streams!
+ public static final int TIMEOUT = 30000;
+
+ public ForwardPublishUpTree(long id, PeerNode src, long packetNo,
byte[] data, Message m) {
+ uid = id;
+ packetNumber = packetNo;
+ source = src;
+ contents = data;
+ origMessage = m;
+ }
+
+ public void run() {
+ try {
+ Logger.minor(this, "Running "+this);
+ PeerNode myParent;
+ while(true) {
+ synchronized(this) {
+ myParent = parent;
+ if(restarting) myParent = null;
+ if(upstreamRestarting) myParent = null;
+ }
+ if(myParent == null) {
+ // We just became root, or we restarted
+ handlePublishData(uid, source, origMessage);
+ return;
+ }
+ // Send upwards
+ Message msg = DMT.createFNPPublishData(contents, key,
packetNumber, uid);
+ try {
+ myParent.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ lostConnection();
+ return;
+ }
+ // Wait for reply
+ MessageFilter mfSuccess =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataSucceeded);
+ MessageFilter mfRestarting =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataRejectedRestarting);
+ MessageFilter mfCollision =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataCollision);
+ MessageFilter mfInvalid =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPPublishDataInvalid);
+ MessageFilter mfOverload =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedOverload);
+ MessageFilter mfLoop =
MessageFilter.create().setTimeout(TIMEOUT).setField(DMT.UID,
uid).setSource(myParent).setMatchesDroppedConnection(true).setType(DMT.FNPRejectedLoop);
+
+ MessageFilter mf =
mfSuccess.or(mfRestarting.or(mfCollision.or(mfInvalid.or(mfOverload.or(mfLoop)))));
+
+ Message reply;
+
+ try {
+ reply = source.node.usm.waitFor(mf);
+ } catch (DisconnectedException e1) {
+ lostConnection();
return;
}
- PeerNode[] peers;
- // We don't strictly need to synchronize, but
- // if we don't we may lose packets.
+
+ if(reply == null && !myParent.isConnected()) {
+ Logger.error(this, "Did not get notified of parent
disconnection!");
+ lostConnection();
+ return;
+ }
+
+ if(reply == null || reply.getSpec() == DMT.FNPRejectedOverload) {
+ // Timeout
+ Logger.error(this, "Timeout - "+myParent+" did not respond in
"+TIMEOUT+"ms forwarding PublishData");
+ try {
+ source.sendAsync(DMT.createFNPRejectedOverload(uid), null);
+ } catch (NotConnectedException e1) {
+ Logger.normal(this, "Disconnected while sending rejected
due to overload: "+
+ source+" ("+key+","+packetNumber+")");
+ }
+ return;
+ }
+
+ if(reply.getSpec() == DMT.FNPRejectedLoop) {
synchronized(this) {
- peers = subscriberPeers;
+ if(myParent != parent) continue;
+ }
+ Logger.error(this, "Loop rejection in forwarding PublishData
to parent!");
+ // Tree topology is completely broken
+ forceSubscribe();
+ try {
+
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected while sending rejection
due to restart due to loop: "+
+ source+" ("+key+","+packetNumber+")");
+ }
+ return;
}
- if(peers != null)
- for(int i=0;i<peers.length;i++)
- item.forwardTo(peers[i]);
- // Redistribute it to local subscribers
- localSubscribers.processPacket(packetNumber, packetData);
+ if(reply.getSpec() == DMT.FNPPublishDataInvalid) {
+ // FIXME check it
+ try {
+ source.sendAsync(DMT.createFNPPublishDataInvalid(uid),
null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected while sending invalid
data rejection: "+
+ source+" ("+key+","+packetNumber+")");
+ }
+ return;
}
- public class PacketItem implements NumberedItem {
+ if(reply.getSpec() == DMT.FNPPublishDataRejectedRestarting) {
+ // FIXME enforcement - has it recently said it's restarting?
+ try {
+
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected while relaying
rejected:restarting: "+
+ source+" ("+key+","+packetNumber+")");
+ }
+ return;
+ }
- long packetNumber;
- byte[] data;
+ if(reply.getSpec() == DMT.FNPPublishDataCollision) {
+ // Yay finally a nontrivial failure mode!
+ long suggested = reply.getLong(DMT.SUGGESTED_SEQNO);
+ suggested = Math.max(suggested, packets.getLastNumber()+1);
+ try {
+ source.sendAsync(DMT.createFNPPublishDataCollision(uid,
suggested), null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Not connected while sending
collision: "+
+ source+" ("+key+","+packetNumber+")");
+ }
+ return;
+ }
+
+ if(reply.getSpec() == DMT.FNPPublishDataSucceeded) {
+ // Success!
+ PacketItem item = new PacketItem(packetNumber, contents);
+ packets.add(item);
+ try {
+ source.sendAsync(DMT.createFNPPublishDataSucceeded(uid),
null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Disconnected while adding new packet:
"+
+ source+" ("+key+","+packetNumber+")");
+ }
+ broadcast(item, source);
+ return;
+ }
+
+ Logger.error(this, "Unrecognized packet: "+reply);
+ return;
+ }
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ } finally {
+ source.node.unlockUID(uid);
+ source.node.completed(uid);
+ }
+ }
+
+ private void lostConnection() {
+ Logger.error(this, "Lost parent node");
+ try {
+
source.sendAsync(DMT.createFNPPublishDataRejectedRestarting(uid), null);
+ } catch (NotConnectedException e1) {
+ Logger.normal(this, "Disconnected while sending rejection due
to restart: "+
+ source+" ("+key+","+packetNumber+")");
+ }
+ }
- public PacketItem(long packetNumber, byte[] packetData) {
- this.packetNumber = packetNumber;
- data = packetData;
}
/**
- * Forward this packet to a subscriber node.
- * As an FNPSubscribeData. NOT an FNPPublishData.
- * DO NOT CALL WITH LOCKS HELD, as sends packets.
- * @param pn Node to send to.
+ * Handle a SubscribeRequest. Do not route it. Add the
+ * node to the list of subscriber nodes and send it our status.
+ * Caller should have checked ID for loops.
+ * @param m The message to reply to.
+ * @throws DroppingSubscriptionHandlerException If the node is currently
being dropped
+ * from the parent.
*/
- public void forwardTo(PeerNode pn) {
+ public void handleSubscribeRequest(Message m) throws
DroppingSubscriptionHandlerException {
+ long id = m.getLong(DMT.UID);
+ short htl = m.getShort(DMT.HTL);
+ long lastSeen = m.getLong(DMT.LAST_SEQNO);
+ int orderedMessagesSeq = m.getInt(DMT.ORDERED_MESSAGE_NUM);
+ PeerNode source = (PeerNode) m.getSource();
+ NumberedItem[] items; // packets sender has missed
+
+ // Reset the HTL
+ double origNearest = m.getDouble(DMT.NEAREST_LOCATION);
+ double nearest = origNearest;
+ double myLoc = source.node.lm.getLocation().getValue();
+ double target = key.toNormalizedDouble();
+ if(Math.abs(myLoc-target) < Math.abs(nearest-target)) {
+ htl = Node.MAX_HTL;
+ nearest = myLoc;
+ } else {
+ htl = source.node.decrementHTL(source, htl);
+ }
+ boolean nowRestarting = false;
+ SubscribeHandler handler;
+ try {
+ synchronized(this) {
+ if(isSubscribed(source)) {
+ Logger.error(this, "Node subscribing twice: "+source);
+ return;
+ }
+ if(dropping) throw new DroppingSubscriptionHandlerException();
+
+ // Collect the old packets to send when accepted
+ items = packets.getAfter(lastSeen);
+
+ // Will tell client node current status
+ handler = new SubscribeHandler(node, id, htl, source,
orderedMessagesSeq, origNearest, nearest, this, items);
+
+ if((!subscribed) && (!restarting)) {
+ restarting = true;
+ nowRestarting = true;
+ }
+ }
+ if(nowRestarting) {
+ // Create sender
+ startSubscribe(handler);
+ }
+ } finally {
+ if(nowRestarting && sender == null) {
+ restarting = false;
+ }
+ }
+
+ try {
+ if(items != null && items.length > 0) {
+ for(int i=0;i<items.length;i++) {
+ PacketItem item = (PacketItem)items[i];
+ item.forwardTo(source);
+ }
+ }
+ } catch (NotConnectedException e) {
+ Logger.error(this, "New subscriber node disconnected: "+source);
+ removeSubscriberHandler(handler);
+ }
+ }
+
+ /**
+ * Is the node already subscribed?
+ */
+ private synchronized boolean isSubscribed(PeerNode source) {
+ for(int i=0;i<subscriberPeerHandlers.length;i++) {
+ if(subscriberPeerHandlers[i].subscriber == source) return true;
+ }
+ return false;
+ }
+
+ private void startSubscribe(SubscribeHandler handler) {
+ long id = handler.origID;
+ short htl = handler.origHTL;
+ PeerNode source = handler.subscriber;
+ double nearest = handler.origNearestLoc;
+ startSubscribe(id, htl, nearest, source);
+ }
+
+ private void startSubscribe(long id, short htl, double nearest, PeerNode
source) {
+ synchronized(this) {
+ if(subscribed) return;
+ if(sender != null) return;
+ sender = new SubscribeSender(id, htl, source, nearest, this);
+ }
+ Thread t = new Thread(sender);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ synchronized void addSubscriberHandler(SubscribeHandler sub) {
+ // Add to subscriberPeers
+ if(subscriberPeerHandlers != null) {
+ for(int i=0;i<subscriberPeerHandlers.length;i++) {
+ SubscribeHandler s = subscriberPeerHandlers[i];
+ if(s == sub) break;
+ if(s.subscriber == sub.subscriber) {
+ Logger.error(this, "Subscribing the same node twice", new
Exception("error"));
+ }
+ }
+ }
+ if(subscriberPeerHandlers == null || subscriberPeerHandlers.length ==
0) {
+ subscriberPeerHandlers = new SubscribeHandler[] { sub };
+ } else {
+ SubscribeHandler[] peers = new
SubscribeHandler[subscriberPeerHandlers.length+1];
+ System.arraycopy(subscriberPeerHandlers, 0, peers, 0,
subscriberPeerHandlers.length);
+ peers[peers.length-1] = sub;
+ subscriberPeerHandlers = peers;
+ }
+ }
+
+ public synchronized ClientSubscriptionHandler
makeClientSubscriptionHandler(
+ ClientPublishStreamKey key2) throws
DroppingSubscriptionHandlerException {
+ if(dropping) throw new DroppingSubscriptionHandlerException();
+ if (localSubscribers != null && localSubscribers.length > 0) {
+ for (int i = 0; i < localSubscribers.length; i++)
+ if (localSubscribers[i].getKey().equals(key2))
+ return localSubscribers[i];
+ }
+ ClientSubscriptionHandler csh = new ClientSubscriptionHandler(this,
+ key2);
+ if (localSubscribers == null)
+ localSubscribers = new ClientSubscriptionHandler[] { csh};
+ else {
+ ClientSubscriptionHandler[] handlers = new
ClientSubscriptionHandler[localSubscribers.length + 1];
+ if (localSubscribers.length > 0)
+ System.arraycopy(localSubscribers, 0, handlers, 0,
+ localSubscribers.length);
+ handlers[handlers.length - 1] = csh;
+ localSubscribers = handlers;
+ }
+ return csh;
+ }
+
+ // Can be called while dropping
+ public void remove(ClientSubscriptionHandler handler) {
+ synchronized(this) {
+ if(localSubscribers == null) {
+ Logger.error(this, "Removing "+handler+" but no subscribers!");
+ // Could happen after a throwable was caught perhaps?
+ }
+ int x = 0;
+ boolean seen = false;
+ for(int i=0;i<localSubscribers.length;i++) {
+ ClientSubscriptionHandler csh = localSubscribers[i];
+ if(csh != handler) x++;
+ else seen = true;
+ }
+ if(!seen)
+ Logger.error(this, "Removing "+handler+" but not found in
subscriber list");
+ ClientSubscriptionHandler[] handlers = new
ClientSubscriptionHandler[x];
+ x=0;
+ for(int i=0;i<localSubscribers.length;i++) {
+ ClientSubscriptionHandler csh = localSubscribers[i];
+ if(csh != handler) handlers[x++] = csh;
+ }
+ localSubscribers = handlers;
+ if(!shouldDrop()) return;
+ }
+ drop();
+ }
+
+ /**
+ * Drop ourself from the SubscriptionManager.
+ */
+ private void drop() {
+ synchronized(this) {
+ if(shouldDrop()) {
+ dropping = true;
+ } else {
+ return;
+ }
+ }
+ manager.drop(this, key);
+ }
+
+ /**
+ * Create a new subscribe request, unless we are connected already, or
+ * already running a subscribe request.
+ */
+ public void forceSubscribe() throws DroppingSubscriptionHandlerException {
+ boolean restartingNow = false;
+ try {
+ synchronized(this) {
+ if(dropping) throw new DroppingSubscriptionHandlerException();
+ if(restarting) {
+ return;
+ } else {
+ restartingNow = true;
+ restartUID = random.nextLong();
+ restarting = true;
+ }
+ }
+ startSubscribe(random.nextLong(), Node.MAX_HTL,
node.lm.getLocation().getValue(), null);
+ } finally {
+ if(restartingNow && sender == null)
+ restarting = false;
+ }
+ }
+
+ /**
+ * Callback from SubscribeSender, indicating a change in status.
+ * @param sender2
+ * @param status
+ */
+ void statusChange(SubscribeSender sender2, int status) {
// TODO Auto-generated method stub
}
- public long getNumber() {
- return packetNumber;
+ synchronized long getRestartUID() {
+ return restartUID;
}
+ public synchronized double targetLocation() {
+ return key.toNormalizedDouble();
}
+ /**
+ * @return True if we are currently subscribed or root
+ */
+ public synchronized boolean isSubscribed() {
+ return subscribed;
+ }
+
+ /**
+ * @return The location of the root node. If we are the root,
+ * returns our location.
+ */
+ public synchronized double rootLoc() {
+ return rootLocation;
+ }
+
+ /**
+ * @return The last sequence number we received, for purposes of
subscribing.
+ * This is not necessarily the greatest seq# we have seen.
+ * It is calculated as follows:
+ * - If we have no cached packets, we return -1. (Negative values are
invalid;
+ * this means we want everything, in order).
+ * - If our packets have entirely contiguous values, we return the number
of
+ * the highest packet (taking into account wraparound).
+ * - Otherwise we return the value of the packet before the first
noncontiguous
+ * packet.
+ */
+ public long getLastSeqNum() {
+ return packets.getLastContiguousSeqNum();
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
2005-10-12 18:56:58 UTC (rev 7424)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -2,6 +2,9 @@
import java.util.HashMap;
+import freenet.io.comm.DMT;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
import freenet.keys.ClientPublishStreamKey;
import freenet.keys.PublishStreamKey;
import freenet.support.Logger;
@@ -18,55 +21,48 @@
// We have up to 32 subscriptions
private final int MAX_COUNT = 32;
- private final Node node;
+ final Node node;
/** map: key -> sub. definitively has all subs. */
private final HashMap subscriptionsByKey;
// /** map: parent node -> linkedlist of subs. does not include
parent==null i.e. parent==this. */
// private final HashMap subscriptionsByParent;
// /** map: child node -> linkedlist of subs. some subs may have no
children so won't be included. */
// private final HashMap subscriptionsByChildren;
- /** Client subscriptions. These are the ClientSubscriptionHandler's. */
- private final HashMap clientSubscriptionsByKey;
SubscriptionManager(Node n) {
node = n;
subscriptionsByKey = new HashMap();
// subscriptionsByParent = new HashMap();
// subscriptionsByChildren = new HashMap();
- clientSubscriptionsByKey = new HashMap();
}
/**
* Local subscription.
* Add the stream if necessary, and subscribe to it.
* @return Null if cannot subscribe, otherwise the ClientSubscription.
+ *
+ * Architecture:
+ * - Find (or make, and start a subscribe) the SubscriptionHandler.
+ * - Find (or make) the ClientSubscriptionHandler
+ * - Find (or make) the ClientSubscriptionImpl
*/
public ClientSubscription localSubscribe(ClientPublishStreamKey key,
SubscriptionCallback cb) {
- // FIXME implement doing a sub request
- // For now we will just eavesdrop on locally passed subs.
- // Each pretends it is the root
- ClientSubscriptionHandler csh;
- boolean add = false;
- Logger.minor(this, "Subscribing locally to "+key);
- synchronized(this) {
- csh = (ClientSubscriptionHandler)
clientSubscriptionsByKey.get(key);
- if(csh == null) {
- csh = new ClientSubscriptionHandler(this, key);
- add = true;
- clientSubscriptionsByKey.put(key, csh);
- }
- }
- if(add) {
- Logger.minor(this, "New subscription to "+key);
+ while(true) {
+ try {
SubscriptionHandler sub = makeSubscription(key.getKey());
- if(sub.setLocal(csh) != null) {
- Logger.error(this, "Had local already! for "+key);
- }
- }
+ if(sub == null)
+ return null; // too many streams
+
+ ClientSubscriptionHandler csh =
+ sub.makeClientSubscriptionHandler(key);
+
ClientSubscriptionImpl csi = new ClientSubscriptionImpl(key, cb, csh);
csh.add(csi);
- // FIXME implement the rest - especially sending sub reqs out
return csi;
+ } catch (DroppingSubscriptionHandlerException e) {
+ Logger.minor(this, "Caught "+e);
+ }
+ }
}
/**
@@ -75,7 +71,11 @@
* subscription.
*/
private synchronized SubscriptionHandler makeSubscription(PublishStreamKey
key) {
- SubscriptionHandler sub = (SubscriptionHandler)
subscriptionsByKey.get(key);
+ SubscriptionHandler sub;
+ while(true) {
+ try {
+ synchronized(this) {
+ sub = (SubscriptionHandler) subscriptionsByKey.get(key);
if(sub != null) return sub;
if(subscriptionsByKey.size() >= MAX_COUNT) {
Logger.normal(this, "Rejecting subscription for "+key);
@@ -83,39 +83,113 @@
}
// Make a new one
- sub = new SubscriptionHandler(key);
+ sub = new SubscriptionHandler(this, key, node.random);
subscriptionsByKey.put(key, sub);
- return sub;
}
-
- public synchronized void remove(ClientSubscriptionHandler handler) {
- PublishStreamKey key = handler.getKey().getKey();
- SubscriptionHandler sub = (SubscriptionHandler)
subscriptionsByKey.get(key);
- ClientSubscriptionHandler oldHandler = sub.setLocal(handler);
- if(oldHandler != null && oldHandler != handler) {
- Logger.error(this, "Already had a different handler:
"+oldHandler+" should be "+handler);
+ sub.forceSubscribe();
+ return sub;
+ } catch (DroppingSubscriptionHandlerException e) {
+ Logger.minor(this, "Caught "+e+" in makeSubscription");
}
- if(sub.shouldDrop()) {
- drop(sub, key);
}
}
- private synchronized void drop(SubscriptionHandler sub, PublishStreamKey
key) {
- subscriptionsByKey.remove(key);
+ synchronized void drop(SubscriptionHandler sub, PublishStreamKey key) {
+ SubscriptionHandler oldSub =
+ (SubscriptionHandler) subscriptionsByKey.remove(key);
+ if(oldSub != sub) {
+ Logger.error(this, "Dropping "+sub+" but remove returned "+oldSub);
+ }
}
/**
- * Handle a received packet.
+ * Handle a PublishData packet:
+ * <ul><li>If we do not have the key, return an error.</li>
+ * <li>If we have the key, and have the same packet, with the
+ * same contents, return FNPPublishSucceeded.</li>
+ * <li>If we have the key, and have the same packet with
+ * different contents, return FNPPublishCollision with a suggested
+ * next packet number.</li>
+ * <li>If we are not the root, forward to our parent, and wait
+ * for FNPPublishSucceeded or FNPPublishCollision.</li>
+ * <li>If we are the root, and we don't have that packet number,
+ * accept it; FNPPublishSucceeded. Then broadcast it to all other
+ * nodes on the tree.</li>
+ * @return True unless we want the message to be put back
+ * onto the queue.
*/
- public void receivedPacket(PublishStreamKey key, long packetNumber, byte[]
packetData, PeerNode source) {
+ public boolean handlePublishData(Message m) {
+ // Firstly, do we have the key?
+ PublishStreamKey key = (PublishStreamKey) m.getObject(DMT.KEY);
+ PeerNode source = (PeerNode) m.getSource();
+ long id = m.getLong(DMT.UID);
+ if(!node.lockUID(id) || node.recentlyCompleted(id)) {
+ Logger.error(this, "Rejected "+id+" - loop");
+ try {
+ Message loop = DMT.createFNPRejectedLoop(id);
+ source.sendAsync(loop, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected sending reject:loop on "+id);
+ }
+ return true;
+ }
+ try {
SubscriptionHandler sub;
synchronized(this) {
sub = (SubscriptionHandler) subscriptionsByKey.get(key);
}
if(sub == null) {
- Logger.normal(this, "Dropped sub packet from "+source+" on "+key+"
- not subscribed");
- return;
+ // We do not have the key!
+ Message reply = DMT.createFNPPublishDataNotSubscribed(id);
+ source.sendAsync(reply, null);
+ return true;
+ }
+ return sub.handlePublishData(id, source, m);
+ } catch (Throwable t) {
+ node.unlockUID(id);
+ node.completed(id);
+ Logger.error(this, "Caught "+t, t);
+ return true;
}
- sub.processPacket(packetNumber, packetData, source);
}
+
+ /**
+ * Handle a SubscribeRequest.
+ * @return True unless we want the message to be put back
+ * onto the queue.
+ */
+ public boolean handleSubscribeRequest(Message m) {
+ // Do we have the stream in question, firstly?
+ PublishStreamKey key = (PublishStreamKey) m.getObject(DMT.KEY);
+ PeerNode source = (PeerNode) m.getSource();
+ long id = m.getLong(DMT.UID);
+ if(!node.lockUID(id) || node.recentlyCompleted(id)) {
+ Logger.error(this, "Rejected "+id+" - loop");
+ try {
+ Message loop = DMT.createFNPRejectedLoop(id);
+ source.sendAsync(loop, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this, "Not connected sending reject:loop on "+id);
+ }
+ return true;
+ }
+ try {
+ SubscriptionHandler sub;
+ synchronized(this) {
+ sub = (SubscriptionHandler) subscriptionsByKey.get(key);
+ if(sub == null) {
+ if(subscriptionsByKey.size() < MAX_COUNT) {
+ sub = new SubscriptionHandler(this, key, node.random);
+ subscriptionsByKey.put(key, sub);
+ }
+ }
+ }
+ sub.handleSubscribeRequest(m);
+ } catch (Throwable t) {
+ node.unlockUID(id);
+ node.completed(id);
+ Logger.error(this, "Caught "+t+" handling "+m, t);
+ }
+ return true;
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java
2005-10-12 18:56:58 UTC (rev 7424)
+++
branches/publish-subscribe/freenet/src/freenet/support/NumberedRecentItems.java
2005-10-12 19:50:08 UTC (rev 7425)
@@ -80,8 +80,8 @@
};
}
- public synchronized NumberedItem get(int num) {
- int x = java.util.Arrays.binarySearch(items, new Integer(num),
myComparator);
+ public synchronized NumberedItem get(long num) {
+ int x = java.util.Arrays.binarySearch(items, new Long(num),
myComparator);
if(x >= 0) return items[x];
return null;
}
@@ -162,4 +162,46 @@
System.arraycopy(items, firstGreater, out, 0,
items.length-firstGreater);
return out;
}
+
+ /**
+ * @return The number of the item with the highest value
+ * so far. If the list is completely empty this will throw
+ * an NPE.
+ */
+ public synchronized long getLastNumber() {
+ return items[items.length-1].getNumber();
+ }
+
+ /**
+ * @return The last sequence number we received, for purposes of
subscribing.
+ * This is not necessarily the greatest seq# we have seen.
+ * It is calculated as follows:
+ * - If we have no cached packets, we return -1. (Negative values are
invalid;
+ * this means we want everything, in order).
+ * - If our packets have entirely contiguous values, we return the number
of
+ * the highest packet (taking into account wraparound).
+ * - Otherwise we return the value of the packet before the first
noncontiguous
+ * packet.
+ */
+ public synchronized long getLastContiguousSeqNum() {
+ checkSorted(); // might as well
+ long max = -1;
+ boolean nullSoFar = true;
+ for(int i=0;i<items.length;i++) {
+ NumberedItem item = items[i];
+ if(item != null) {
+ long x = item.getNumber();
+ if(nullSoFar) {
+ max = x;
+ } else {
+ if(x == max+1)
+ max = x;
+ else
+ return max;
+ }
+ nullSoFar = false;
+ }
+ }
+ return max;
+ }
}