Author: toad
Date: 2005-10-18 19:54:33 +0000 (Tue, 18 Oct 2005)
New Revision: 7438
Modified:
branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
Log:
Lots of changes. Just about compiles, except for resubscribe support.
Modified: branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
2005-10-18 14:14:07 UTC (rev 7437)
+++ branches/publish-subscribe/freenet/src/freenet/io/comm/DMT.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -934,6 +934,18 @@
return msg;
}
+ public static final MessageType FNPUnsubscribe = new
MessageType("FNPUnsubscribe") {{
+ addField(UID, Long.class);
+ }};
+
+
+ public static Message createFNPUnsubscribe(long id, int counter) {
+ Message msg = new Message(FNPUnsubscribe);
+ msg.set(UID, id);
+ msg.set(ORDERED_MESSAGE_NUM, counter);
+ return msg;
+ }
+
public static final MessageType FNPPublishDataCollision = new
MessageType("FNPPublishDataCollision") {{
addField(UID, Long.class);
addField(SUGGESTED_SEQNO, Long.class);
@@ -963,5 +975,4 @@
// }
//
public static void init() { }
-
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
2005-10-18 14:14:07 UTC (rev 7437)
+++
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionHandler.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -56,4 +56,25 @@
impl.processPacket(packetNumber, finalData);
}
}
+
+ public synchronized void notifyConnected() {
+ for(Iterator i=clientSubs.iterator();i.hasNext();) {
+ ClientSubscriptionImpl sub = (ClientSubscriptionImpl)
i.next();
+ sub.notifyConnected();
+ }
+ }
+
+ public synchronized void forceDisconnect() {
+ for(Iterator i=clientSubs.iterator();i.hasNext();) {
+ ClientSubscriptionImpl sub = (ClientSubscriptionImpl)
i.next();
+ sub.forceDisconnect();
+ }
+ }
+
+ public synchronized void notifyDisconnected() {
+ for(Iterator i=clientSubs.iterator();i.hasNext();) {
+ ClientSubscriptionImpl sub = (ClientSubscriptionImpl)
i.next();
+ sub.notifyDisconnected();
+ }
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java
2005-10-18 14:14:07 UTC (rev 7437)
+++
branches/publish-subscribe/freenet/src/freenet/node/ClientSubscriptionImpl.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -46,4 +46,28 @@
}
}
+ public void notifyConnected() {
+ try {
+ cb.connected();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" from callback in
notifyConnected on "+this, t);
+ }
+ }
+
+ public void forceDisconnect() {
+ try {
+ cb.lostConnectionFatal();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" from callback in
forceDisconnect on "+this, t);
+ }
+ disconnect();
+ }
+
+ public void notifyDisconnected() {
+ try {
+ cb.lostConnection();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" from callback in
notifyDisconnected on "+this, t);
+ }
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
===================================================================
--- branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-18 14:14:07 UTC (rev 7437)
+++ branches/publish-subscribe/freenet/src/freenet/node/SubscribeSender.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -20,6 +20,7 @@
private short htl;
final PeerNode source;
final SubscriptionHandler handler;
+ final SubscribeHandler origSubscriber;
final double nearestSoFar;
private final HashSet nodesRoutedTo;
private PeerNode next;
@@ -56,8 +57,10 @@
* @param source The source node. Null if this is locally originated.
* @param handler The SubscriptionHandler which created us, and who want
to know
* when our status changes.
+ * @param sub The SubscribeHandler which originated this SubscribeSender,
if any.
+ * Null if this is a locally originated subscription.
*/
- public SubscribeSender(long id, short htl, PeerNode source, double
nearestSoFar, SubscriptionHandler handler) {
+ public SubscribeSender(long id, short htl, PeerNode source, double
nearestSoFar, SubscriptionHandler handler, SubscribeHandler sub) {
this.id = id;
this.htl = htl;
this.source = source;
@@ -67,6 +70,7 @@
node = handler.node;
status = SEARCHING;
nodesRoutedTo = new HashSet();
+ origSubscriber = sub;
}
public void run() {
@@ -146,13 +150,18 @@
if(rootLoc < 0.0 || rootLoc > 1.0) {
Logger.error(this, "Invalid root loc: "+rootLoc+" from
"+next);
msg = DMT.createFNPUnsubscribe(id, counter);
- next.sendAsync(msg, null);
- continue;
+ try {
+ next.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Not
connected sending unsub (invalid root loc "+rootLoc+") to "+next);
+ }
+ handler.invalidSuccessInRestarting();
+ setStatus(SEARCHING);
}
// Validated success
- handler.subscribeSucceeded(next, rootLoc);
+ handler.subscribeSucceeded(next, rootLoc,
(short)(origSubscriber.origHTL - htl));
setStatus(SUCCESS);
- break;
+ return;
}
} catch (DisconnectedException e) {
@@ -272,7 +281,7 @@
MessageFilter mfSubscribeRestarted =
MessageFilter.create().setField(DMT.UID,
id).setSource(next).setType(DMT.FNPSubscribeRestarted).setTimeout(INITIAL_TIMEOUT).setField(DMT.ORDERED_MESSAGE_NUM,
counter);
counter++;
- MessageFilter mf =
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted)));
+ MessageFilter mf =
mfRejectedLoop.or(mfRejectedOverload.or(mfSubscribeSucceeded.or(mfSubscribeRestarted.or(mfRouteNotFound))));
try {
msg = node.usm.waitFor(mf);
@@ -302,11 +311,15 @@
if(rootLoc < 0.0 || rootLoc > 1.0) {
Logger.error(this, "Invalid root loc: "+rootLoc+" from
"+next);
msg = DMT.createFNPUnsubscribe(id, counter);
- next.sendAsync(msg, null);
+ try {
+ next.sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ Logger.error(this,
"NotConnectedException sending "+msg+" to "+next);
+ }
continue;
}
// Validated success
- handler.subscribeSucceeded(next, rootLoc);
+ handler.subscribeSucceeded(next, rootLoc,
(short)(origSubscriber.origHTL - htl));
setStatus(SUCCESS);
break;
}
@@ -338,5 +351,9 @@
Logger.debug(this, "Setting status to "+newStatus+" on "+this);
status = newStatus;
}
+
+ public short getHTL() {
+ return htl;
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java
2005-10-18 14:14:07 UTC (rev 7437)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionCallback.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -5,12 +5,19 @@
*/
public interface SubscriptionCallback {
+ /** Received a packet */
void got(long packetNumber, byte[] data);
+ /** Lost the subscription */
void lostConnection();
+ /** Permanently lost the connection */
+ void lostConnectionFatal();
+
+ /** Restarted the subscription */
void restarted();
+ /** Connected to the stream */
void connected();
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-18 14:14:07 UTC (rev 7437)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionHandler.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -527,7 +527,7 @@
// Collect the old packets to send when accepted
items = packets.getAfter(lastSeen);
- // Will tell client node current status
+ // Will tell client node current status, and add self to us
handler = new SubscribeHandler(node, id, htl, source,
orderedMessagesSeq, origNearest, nearest, this, items);
if(subscribed) {
@@ -579,7 +579,7 @@
}
}
- /**
+ /**
* Is the node already subscribed?
*/
private synchronized boolean isSubscribed(PeerNode source) {
@@ -594,14 +594,14 @@
short htl = handler.origHTL;
PeerNode source = handler.subscriber;
double nearest = handler.origNearestLoc;
- startSubscribe(id, htl, nearest, source);
+ startSubscribe(id, htl, nearest, source, handler);
}
- private void startSubscribe(long id, short htl, double nearest, PeerNode
source) {
+ private void startSubscribe(long id, short htl, double nearest, PeerNode
source, SubscribeHandler sub) {
synchronized(this) {
if(subscribed) return;
if(sender != null) return;
- sender = new SubscribeSender(id, htl, source, nearest, this);
+ sender = new SubscribeSender(id, htl, source, nearest, this, sub);
}
Thread t = new Thread(sender);
t.setDaemon(true);
@@ -711,7 +711,7 @@
restarting = true;
}
}
- startSubscribe(random.nextLong(), Node.MAX_HTL,
node.lm.getLocation().getValue(), null);
+ startSubscribe(random.nextLong(), Node.MAX_HTL,
node.lm.getLocation().getValue(), null, null);
} finally {
if(restartingNow && sender == null)
restarting = false;
@@ -719,7 +719,10 @@
}
synchronized long getRestartUID() {
- return restartUID;
+ if(upstreamRestarting)
+ return upstreamRestartUID;
+ else
+ return restartUID;
}
public synchronized double targetLocation() {
@@ -738,6 +741,8 @@
* returns our location.
*/
public synchronized double rootLoc() {
+ if(isRoot())
+ return node.lm.getLocation().getValue();
return rootLocation;
}
@@ -761,31 +766,214 @@
* @param next The node we are now subscribed through.
* @param rootLoc The tree root's location.
*/
- public void subscribeSucceeded(PeerNode next, double rootLoc) {
- SubscribeHandler[] handlers;
+ public void subscribeSucceeded(PeerNode next, double rootLoc, short
htlDiff) {
+ ClientSubscriptionHandler[] clients;
synchronized(this) {
+ SubscribeHandler[] handlers;
parent = next;
subscribed = true;
+ // Do not null sender, as still using it; it persists
for as long as the subscription does
+ restarting = false;
rootLocation = rootLoc;
handlers = subscriberPeerHandlers;
- for(int i=0;i<handlers.length;i++) {
- SubscribeHandler handler = handlers[i];
- if(Math.abs(handler.origNearestLoc-target) <
Math.abs(rootLoc-target)) {
- // htl 0 because of root
- Message msg =
DMT.createFNPRouteNotFound(handler.origID, (short)0);
- try {
-
handler.subscriber.sendAsync(msg, null);
+ if(handlers != null) {
+ for(int i=0;i<handlers.length;i++) {
+ SubscribeHandler handler = handlers[i];
+
if(Math.abs(handler.origNearestLoc-target) <= Math.abs(rootLoc-target)) {
+ // If ==, better to go to the
old one.
+ // htl 0 because of root
+ Message msg =
DMT.createFNPRouteNotFound(handler.origID, (short)Math.max(0,
handler.origHTL-htlDiff));
+ try {
+
handler.subscriber.sendAsync(msg, null);
+ } catch (NotConnectedException
e) {
+ // Will not happen
often!
+ Logger.minor(this,
"Rejected because orig closer than root, but conn closed on "+handler.origID+"
for "+this);
+ }
+
removeSubscriberHandler(handler);
+ continue;
+ } else {
+ handler.sendStatusMessage(true,
rootLoc);
+ // Stays on list because is
subscribed
+ }
+ }
+ }
+ clients = localSubscribers;
+ }
+ for(int i=0;i<clients.length;i++) {
+ clients[i].notifyConnected();
+ }
+ }
+
+ /** Our SubscribeSender could not find the stream it was trying to
+ * subscribe to. If we are closer to the target than the current
+ * SubscribeHandler's best-so-far, we become the root node. Otherwise
+ * we send it back an RNF, and possibly restart the subscription
+ * attempt with the next SubscribeHandler.
+ * @throws DroppingSubscriptionHandlerException
+ */
+ void senderRNF() {
+ Logger.minor(this, "Sender could not find stream on "+this);
+ boolean becomeRoot = false;
+ short htlDiff = 0;
+ synchronized(this) {
+ SubscribeHandler origSub = sender.origSubscriber;
+ if(origSub == null) {
+ sender = null;
+ becomeRoot = true;
+ htlDiff = 0;
+ } else {
+ double prevBest = origSub.origNearestLoc;
+ double me = node.lm.getLocation().getValue();
+ if(Math.abs(prevBest-target) >
Math.abs(me-target)) {
+ // Succeed
+ sender = null;
+ becomeRoot = true;
+ htlDiff = (short) (sender.getHTL() -
origSub.origHTL);
+ } else {
+ // Fail: Send an RNF to the origSub.
+ Message rnf =
DMT.createFNPRouteNotFound(origSub.origID, sender.getHTL());
+ try {
+
origSub.subscriber.sendAsync(rnf, null);
} catch (NotConnectedException e) {
- // Will not happen often!
- Logger.minor(this, "Rejected
because orig closer than root, but conn closed on "+handler.origID+" for
"+this);
+ // Doh
+ Logger.normal(this, "Lost
connection to subscriber while telling it it failed: "+origSub+" on "+this);
}
- removeSubscriberHandler(handler);
- continue;
- } else {
- handler.sendStatusMessage(true,
rootLoc);
- // Stays on list because is subscribed
+ // Now, is there another one to try?
+ removeSubscriberHandler(origSub);
+ maybeRestartSenderOnError();
}
}
}
+ if(becomeRoot)
+ becomeRoot(htlDiff);
+ if(dropping)
+ manager.drop(this, key);
}
+
+ /**
+ * Restart the sender if possible, after an error, unless shouldDrop(),
+ * in which case set dropping.
+ */
+ private synchronized void maybeRestartSenderOnError() {
+ if(!shouldDrop()) {
+ sender = null;
+ if(subscriberPeerHandlers != null &&
subscriberPeerHandlers.length > 0) {
+ SubscribeHandler handler =
subscriberPeerHandlers[0];
+ startSubscribe(handler);
+ } else {
+ // Local subscriptions
+ try {
+ forceSubscribe();
+ } catch (DroppingSubscriptionHandlerException
e) {
+ // Hrrrrrmm...
+ if(shouldDrop()) {
+ Logger.normal(this, "Got "+e);
+ } else {
+ Logger.error(this, "Got: "+e+"
but !shouldDrop() !!");
+ }
+ }
+ // If that fails, we become root
+ }
+ } else {
+ sender = null;
+ subscribed = false;
+ restarting = false;
+ dropping = true;
+ }
+ }
+
+ /** Become root */
+ synchronized void becomeRoot(short htlDiff) {
+ subscribeSucceeded(null, node.lm.getLocation().getValue(),
htlDiff);
+ }
+
+ /** Sender received RejectedOverload.
+ * Forward it to the original requestor, and run another one if
possible.
+ * If this was a local request, tell all clients they are disconnected.
+ */
+ void senderRejectedOverload() {
+ ClientSubscriptionHandler[] clients;
+ synchronized(this) {
+ SubscribeHandler origSub = sender.origSubscriber;
+ if(origSub != null) {
+ Message rejected =
DMT.createFNPRejectedOverload(origSub.origID);
+ try {
+ origSub.subscriber.sendAsync(rejected,
null);
+ } catch (NotConnectedException e) {
+ Logger.minor(this, "Failed to send
RejectedOverload to "+origSub.subscriber+" for "+this);
+ }
+ removeSubscriberHandler(origSub);
+ // Exactly the same as above - either restart
or drop
+ maybeRestartSenderOnError();
+ return;
+ }
+ clients = localSubscribers;
+ localSubscribers = null;
+ }
+ // If still here, was local-only, so we cancel the local
requests.
+ for(int i=0;i<clients.length;i++) {
+ clients[i].forceDisconnect();
+ }
+ }
+
+ /** Upstream restarted */
+ void subscribeRestarted(PeerNode next, long restartID) {
+ SubscribeHandler[] subscribers;
+ synchronized(this) {
+ parent = next;
+ upstreamRestarting = true;
+ upstreamRestartUID = restartID;
+ subscribers = subscriberPeerHandlers;
+ }
+ for(int i=0;i<subscribers.length;i++)
+ subscribers[i].sendStatusMessage(false, rootLoc());
+ }
+
+ /** Generic method for upstream restart failed */
+ private void upstreamRestartFailed() {
+ SubscribeHandler[] subscribers;
+ synchronized(this) {
+ // Kill upstream restart
+ upstreamRestarting = false;
+ subscribers = subscriberPeerHandlers;
+ }
+ for(int i=0;i<subscribers.length;i++)
+ subscribers[i].sendStatusMessage(false, rootLoc());
+ }
+
+ /**
+ * Got an RNF while upstream was restarting.
+ * Just send our old restart ID out.
+ */
+ void senderRestartingSearchingRNF() {
+ upstreamRestartFailed();
+ }
+
+ /** Lost connection to node which was restarting. */
+ void lostParentConnectionWasRestarting() {
+ upstreamRestartFailed();
+ }
+
+ /** Restarting would-be parent sent an invalid success message */
+ void invalidSuccessInRestarting() {
+ upstreamRestartFailed();
+ }
+
+ /** Had a connection, lost it */
+ public void lostParentConnectionWasSuccessful() {
+ SubscribeHandler[] handlers;
+ ClientSubscriptionHandler[] clients;
+ synchronized(this) {
+ restarting = true;
+ subscribed = false;
+ rootLocation = -1.0;
+ parent = null;
+ handlers = subscriberPeerHandlers;
+ clients = localSubscribers;
+ }
+ for(int i=0;i<handlers.length;i++)
+ handlers[i].sendStatusMessage(false, -1.0);
+ for(int i=0;i<clients.length;i++)
+ clients[i].notifyDisconnected();
+ }
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
2005-10-18 14:14:07 UTC (rev 7437)
+++
branches/publish-subscribe/freenet/src/freenet/node/SubscriptionManager.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -97,7 +97,7 @@
synchronized void drop(SubscriptionHandler sub, PublishStreamKey key) {
SubscriptionHandler oldSub =
(SubscriptionHandler) subscriptionsByKey.remove(key);
- if(oldSub != sub) {
+ if(oldSub != sub && oldSub != null) {
Logger.error(this, "Dropping "+sub+" but remove returned "+oldSub);
}
}
Modified:
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
===================================================================
---
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
2005-10-18 14:14:07 UTC (rev 7437)
+++
branches/publish-subscribe/freenet/src/freenet/node/TextModeClientInterface.java
2005-10-18 19:54:33 UTC (rev 7438)
@@ -513,48 +513,35 @@
}
public void got(long packetNumber, byte[] data) {
- try {
- subscribedDataStream.write(name+":"+packetNumber+":"+new
String(data)+"\n");
- subscribedDataStream.flush();
- } catch (IOException e) {
- String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
- Logger.error(this, s);
- System.err.println(s);
- }
+ write(name+":"+packetNumber+":"+new String(data)+"\n");
}
- public void lostConnection() {
+ private void write(String string) {
try {
- subscribedDataStream.write(name+":LOST CONNECTION\n");
+ subscribedDataStream.write(string);
subscribedDataStream.flush();
} catch (IOException e) {
String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
Logger.error(this, s);
System.err.println(s);
}
+ }
+
+ public void lostConnection() {
+ write(name+":LOST CONNECTION\n");
}
public void restarted() {
- try {
- subscribedDataStream.write(name+":RESTARTED\n");
- subscribedDataStream.flush();
- } catch (IOException e) {
- String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
- Logger.error(this, s);
- System.err.println(s);
- }
+ write(name+":RESTARTED\n");
}
public void connected() {
- try {
- subscribedDataStream.write(name+":CONNECTED\n");
- subscribedDataStream.flush();
- } catch (IOException e) {
- String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
- Logger.error(this, s);
- System.err.println(s);
- }
+ write(name+":CONNECTED\n");
}
+
+ public void lostConnectionFatal() {
+ write(name+":PERMANENTLY CONNECTION\n");
+ }
}
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs