Update of /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv7319/src/freenet/node
Modified Files:
Node.java Version.java NodeDispatcher.java SimpleClient.java
TextModeClientInterface.java
Added Files:
SubscriptionCallback.java PublishContext.java
ClientSubscriptionImpl.java SubscriptionManager.java
ClientSubscriptionHandler.java PublishHandlerSender.java
ClientSubscription.java SubscriptionHandler.java
Log Message:
Build 136:
Initial publish/subscribe support.
Publish works. Subscribe requests are not yet routed; we just subscribe
locally, if the stream happens to pass through us.
--- NEW FILE: SubscriptionCallback.java ---
package freenet.node;
/**
* Publish/Subscribe subscriber callback.
*/
public interface SubscriptionCallback {
void got(long packetNumber, byte[] data);
void lostConnection();
void restarted();
void connected();
}
--- NEW FILE: PublishContext.java ---
package freenet.node;
import freenet.keys.ClientPublishStreamKey;
/**
* Context for a locally originated publish stream.
*/
public class PublishContext {
final ClientPublishStreamKey key;
final Node node;
/** Last packet number. Incremented each time, modulo 2^63-1 */
private long lastPacketNumber;
/**
* @param key
*/
public PublishContext(ClientPublishStreamKey key, Node node) {
this.key = key;
this.node = node;
lastPacketNumber = Math.abs(node.random.nextLong());
}
/**
* Publish a block of data to the stream. Must not exceed
* size limit.
*/
public void publish(byte[] data) {
long packetNumber;
synchronized(this) {
packetNumber = lastPacketNumber;
long next = lastPacketNumber+1;
if(next < 0) next = 0;
lastPacketNumber = next;
}
byte[] encrypted =
key.encrypt(packetNumber, data, node.random);
PublishHandlerSender ps = new PublishHandlerSender(key.getKey(),
node.random.nextLong(), packetNumber, encrypted, node, null, -1.0);
}
}
--- NEW FILE: ClientSubscriptionImpl.java ---
package freenet.node;
import freenet.keys.ClientPublishStreamKey;
import freenet.support.Logger;
public class ClientSubscriptionImpl implements ClientSubscription {
private ClientPublishStreamKey key;
private SubscriptionCallback cb;
private ClientSubscriptionHandler ch;
private boolean finished = false;
/**
* @param key2
* @param cb2
* @param csh
*/
public ClientSubscriptionImpl(ClientPublishStreamKey key2,
SubscriptionCallback cb2, ClientSubscriptionHandler csh) {
cb = cb2;
key = key2;
ch = csh;
}
public ClientPublishStreamKey getKey() {
return key;
}
public SubscriptionCallback getCallback() {
return cb;
}
public void disconnect() {
finished = true;
ch.remove(this);
}
/**
* Received decrypted data from the stream.
*/
public void processPacket(long packetNumber, byte[] finalData) {
try {
cb.got(packetNumber, finalData);
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" from callback in processPacket("+
packetNumber+") on "+this, t);
}
}
}
--- NEW FILE: SubscriptionManager.java ---
package freenet.node;
import java.util.HashMap;
import freenet.keys.ClientPublishStreamKey;
import freenet.keys.PublishStreamKey;
import freenet.support.Logger;
/**
* Tracks Publish/Subscribe streams:
* - Local subscriptions.
* - Remote subscriptions (other nodes subscribing to a stream through us).
* - Whether we are root for a given stream.
* - Nodes that we subscribe through to get a given stream (i.e. our tree
parent).
*/
public class SubscriptionManager {
// We have up to 32 subscriptions
private final int MAX_COUNT = 32;
private final Node node;
/** map: key -> sub. definitively has all subs. */
private final HashMap subscriptionsByKey;
// /** map: parent node -> linkedlist of subs. does not include parent==null
i.e. parent==this. */
// private final HashMap subscriptionsByParent;
// /** map: child node -> linkedlist of subs. some subs may have no children
so won't be included. */
// private final HashMap subscriptionsByChildren;
/** Client subscriptions. These are the ClientSubscriptionHandler's. */
private final HashMap clientSubscriptionsByKey;
SubscriptionManager(Node n) {
node = n;
subscriptionsByKey = new HashMap();
// subscriptionsByParent = new HashMap();
// subscriptionsByChildren = new HashMap();
clientSubscriptionsByKey = new HashMap();
}
/**
* Local subscription.
* Add the stream if necessary, and subscribe to it.
* @return Null if cannot subscribe, otherwise the ClientSubscription.
*/
public ClientSubscription localSubscribe(ClientPublishStreamKey key,
SubscriptionCallback cb) {
// FIXME implement doing a sub request
// For now we will just eavesdrop on locally passed subs.
// Each pretends it is the root
ClientSubscriptionHandler csh;
boolean add = false;
synchronized(this) {
csh = (ClientSubscriptionHandler) clientSubscriptionsByKey.get(key);
if(csh == null) {
csh = new ClientSubscriptionHandler(this, key);
add = true;
clientSubscriptionsByKey.put(key, csh);
}
}
if(add) {
Logger.minor(this, "New subscription to "+key);
SubscriptionHandler sub = makeSubscription(key.getKey());
if(sub.setLocal(csh) != null) {
Logger.error(this, "Had local already! for "+key);
}
}
ClientSubscriptionImpl csi = new ClientSubscriptionImpl(key, cb, csh);
csh.add(csi);
// FIXME implement the rest - especially sending sub reqs out
return csi;
}
/**
* Create a back-end subscription. This can be called
* as a result of a subscribe request or of a local
* subscription.
*/
private synchronized SubscriptionHandler makeSubscription(PublishStreamKey
key) {
SubscriptionHandler sub = (SubscriptionHandler)
subscriptionsByKey.get(key);
if(sub != null) return sub;
if(subscriptionsByKey.size() >= MAX_COUNT) {
Logger.normal(this, "Rejecting subscription for "+key);
return null;
}
// Make a new one
sub = new SubscriptionHandler(key);
subscriptionsByKey.put(key, sub);
return sub;
}
public synchronized void remove(ClientSubscriptionHandler handler) {
PublishStreamKey key = handler.getKey().getKey();
SubscriptionHandler sub = (SubscriptionHandler)
subscriptionsByKey.get(key);
ClientSubscriptionHandler oldHandler = sub.setLocal(handler);
if(oldHandler != null && oldHandler != handler) {
Logger.error(this, "Already had a different handler: "+oldHandler+"
should be "+handler);
}
if(sub.shouldDrop()) {
drop(sub, key);
}
}
private synchronized void drop(SubscriptionHandler sub, PublishStreamKey
key) {
subscriptionsByKey.remove(key);
}
/**
* Handle a received packet.
*/
public void receivedPacket(PublishStreamKey key, long packetNumber, byte[]
packetData, PeerNode source) {
SubscriptionHandler sub;
synchronized(this) {
sub = (SubscriptionHandler) subscriptionsByKey.get(key);
}
if(sub == null) {
Logger.normal(this, "Dropped sub packet from "+source+" on "+key);
return;
}
sub.processPacket(packetNumber, packetData, source);
}
}
--- NEW FILE: ClientSubscriptionHandler.java ---
package freenet.node;
import java.util.Iterator;
import java.util.LinkedList;
import freenet.keys.ClientPublishStreamKey;
import freenet.support.Logger;
/**
* Contains a list of ClientSubscriptionImpl's.
* When they all disconnect, we unregister.
*/
public class ClientSubscriptionHandler {
final ClientPublishStreamKey key;
final SubscriptionManager sm;
boolean finished = false;
private final LinkedList clientSubs;
ClientSubscriptionHandler(SubscriptionManager manager,
ClientPublishStreamKey key) {
clientSubs = new LinkedList();
sm = manager;
this.key = key;
}
synchronized void add(ClientSubscriptionImpl sub) {
if(finished)
throw new IllegalArgumentException();
clientSubs.add(sub);
}
synchronized void remove(ClientSubscriptionImpl sub) {
clientSubs.remove(sub);
if(clientSubs.size() == 0) {
finished = true;
sm.remove(this);
}
}
public ClientPublishStreamKey getKey() {
return key;
}
/**
* Received data!
*/
public void processPacket(long packetNumber, byte[] packetData) {
byte[] finalData = key.decrypt(packetNumber, packetData);
if(finalData == null) {
Logger.error(this, "Packet did not decrypt");
return;
}
for(Iterator i=clientSubs.iterator();i.hasNext();) {
ClientSubscriptionImpl impl = (ClientSubscriptionImpl)i.next();
impl.processPacket(packetNumber, finalData);
}
}
}
--- NEW FILE: PublishHandlerSender.java ---
package freenet.node;
import java.util.HashSet;
import freenet.io.comm.DMT;
import freenet.io.comm.DisconnectedException;
import freenet.io.comm.Message;
import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.keys.PublishStreamKey;
import freenet.support.Fields;
import freenet.support.Logger;
import freenet.support.ShortBuffer;
/**
* Forwards a PublishData message. Routed, exactly as an insert.
* Also communicates with the previous hop. For inserts and requests
* we split this into two - Handler and Sender - but for this it is
* simpler to have one, especially as there is no coalescing.
*/
public class PublishHandlerSender implements Runnable {
// Basics
final PublishStreamKey key;
final double target;
final long uid;
private short htl;
final PeerNode source;
final Node node;
final long packetNumber;
final byte[] packetData;
final Thread t;
private double closestLocation;
// Constants - FIXME way too high?
static final int ACCEPTED_TIMEOUT = 10000;
static final int COMPLETE_TIMEOUT = 120000;
public String toString() {
return super.toString()+": id="+uid+", source="+source+",
packetNo="+packetNumber+", data="+Fields.hashCode(packetData);
}
/**
* Create a PublishHandlerSender.
* closestLocation must be reset by the caller if we are closer to target
* than the node sending the request.
*/
public PublishHandlerSender(PublishStreamKey key2, long id, long packetNo,
byte[] data, Node n, PeerNode src, double closestLoc) {
node = n;
key = key2;
target = key.toNormalizedDouble();
uid = id;
packetNumber = packetNo;
packetData = data;
source = src;
closestLocation = closestLoc;
htl = Node.MAX_HTL;
t = new Thread(this);
t.setDaemon(true);
t.start();
}
/**
* Create from an incoming Message. Does not check
* UID/key for collision; this must be done by caller.
* Will do everything else though, including resetting
* the closestLocation if necessary.
*/
public PublishHandlerSender(Message m, Node n) {
this.node = n;
key = (PublishStreamKey) m.getObject(DMT.KEY);
target = key.toNormalizedDouble();
uid = m.getLong(DMT.UID);
packetNumber = m.getLong(DMT.STREAM_SEQNO);
source = (PeerNode) m.getSource();
packetData = ((ShortBuffer)m.getObject(DMT.DATA)).getData();
closestLocation = m.getDouble(DMT.NEAREST_LOCATION);
htl = m.getShort(DMT.HTL);
double myLoc = n.lm.getLocation().getValue();
if(Math.abs(myLoc - target) < Math.abs(closestLocation - target))
closestLocation = myLoc;
t = new Thread(this);
t.setDaemon(true);
t.start();
}
/** Very similar to InsertSender.run() */
public void run() {
Logger.minor(this, "Running "+this);
try {
// Copy it as it will be decrypted in place
byte[] packetDataCopy = new byte[packetData.length];
System.arraycopy(packetData, 0, packetDataCopy, 0,
packetData.length);
node.subscriptions.receivedPacket(key, packetNumber,
packetDataCopy, source);
short origHTL = htl;
if(source != null) {
Message msg = DMT.createFNPAccepted(uid);
try {
source.sendAsync(msg, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Not connected sending Accepted on
"+this);
}
}
HashSet nodesRoutedTo = new HashSet();
// Don't check whether source is connected; forward anyway
while(true) {
if(htl == 0) {
// Ran out of hops; success
if(source != null) {
Message msg = DMT.createFNPPublishDataSucceeded(uid);
try {
source.sendAsync(msg, null);
} catch (NotConnectedException e) {
Logger.minor(this, "Not connected sending
FNPPublishDataSucceeded on "+this);
}
}
return;
}
// Route
PeerNode next;
// Can backtrack, so only route to nodes closer than we are to
target.
double nextValue;
synchronized(node.peers) {
next = node.peers.closerPeer(source, nodesRoutedTo, target,
true);
if(next != null)
nextValue = next.getLocation().getValue();
else
nextValue = -1.0;
}
if(next == null) {
// Backtrack
Logger.minor(this, "No more routes");
if(source != null) {
Message msg = DMT.createFNPRouteNotFound(uid, htl);
try {
source.sendAsync(msg, null);
} catch (NotConnectedException e) {
Logger.normal(this, "Not connected when RNFing on
"+this);
}
} else {
Logger.minor(this, "RNF on "+this);
}
return;
}
Logger.minor(this, "Routing insert to "+next);
nodesRoutedTo.add(next);
if(Math.abs(target - nextValue) > Math.abs(target -
closestLocation)) {
Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+closestLocation);
htl = node.decrementHTL(source, htl);
}
// Now we send it
/**
* Possible responses:
* Accepted
* RejectedOverload
* RejectedLoop
* (timeout)
*/
Message msg = DMT.createFNPPublishData(htl, packetData, key,
packetNumber, uid, closestLocation);
MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
MessageFilter mfPublishDataInvalid =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPPublishDataInvalid);
// mfRejectedOverload must be the last thing in the or
// So its or pointer remains null
// Otherwise we need to recreate it below
MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload.or(mfPublishDataInvalid)));
mfRejectedOverload.clearOr();
try {
next.send(msg);
} catch (NotConnectedException e) {
Logger.minor(this, "Not connected to "+next+" - skipping");
continue;
}
try {
msg = null;
msg = node.usm.waitFor(mf);
} catch (DisconnectedException e1) {
Logger.minor(this, "Not connected to "+next+" while
waiting, skipping");
continue;
}
if(msg == null) {
// Timeout waiting for Accepted
Logger.normal(this, "Timed out waiting for Accepted on
"+this+" from "+next);
continue;
}
if(msg.getSpec() == DMT.FNPRejectedLoop) {
Logger.minor(this, "Rejected: loop for "+this);
continue;
}
if(msg.getSpec() == DMT.FNPRejectedOverload) {
// Propagate back to source; fatal
if(source != null) {
Message m = DMT.createFNPRejectedOverload(uid);
try {
source.sendAsync(m,null);
} catch (NotConnectedException e2) {
Logger.normal(this, "Source disconnected relaying
rejected:overload from "+next+" for "+this);
}
} else {
Logger.normal(this, "FNPDataPublish rejected: overload
from "+next+" for "+this);
}
return;
}
if(msg.getSpec() != DMT.FNPAccepted) {
throw new IllegalStateException("Unrecognized message:
"+msg+" while waiting for Accepted on "+this);
}
// Got an Accepted; wait for a success
MessageFilter mfSucceeded =
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
uid).setType(DMT.FNPPublishDataSucceeded);
MessageFilter mfRNF =
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
uid).setType(DMT.FNPRouteNotFound);
mfRejectedOverload.clearOr();
mfPublishDataInvalid.clearOr();
mf =
mfSucceeded.or(mfRNF.or(mfRejectedOverload.or(mfPublishDataInvalid)));
msg = null;
try {
node.usm.waitFor(mf);
} catch (DisconnectedException e3) {
Logger.error(this, "Disconnected from next: "+next+" while
waiting for completion on "+this);
continue; // can cause load multiplication, hence the
error; what else are we supposed to do though?
}
if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
// Timeout
Logger.error(this, "Timeout waiting for completion of
"+this);
// Propagate back to source; fatal
if(source != null) {
Message m = DMT.createFNPRejectedOverload(uid);
try {
source.sendAsync(m,null);
} catch (NotConnectedException e2) {
Logger.normal(this, "Source disconnected relaying
rejected:overload from "+next+" for "+this);
}
} else {
Logger.normal(this, "FNPDataPublish rejected: overload
from "+next+" for "+this);
}
return;
}
if(msg.getSpec() == DMT.FNPRouteNotFound) {
Logger.minor(this, "Rejected: RNF");
// Still gets the data - but not yet
short newHtl = msg.getShort(DMT.HTL);
if(htl > newHtl) htl = newHtl;
continue;
}
if(msg.getSpec() == DMT.FNPPublishDataInvalid) {
Logger.error(this, "Got data invalid from "+next+" for
"+this);
// FIXME: check validity ourself, propagate error if needed
continue;
}
if(msg.getSpec() != DMT.FNPPublishDataSucceeded) {
throw new IllegalStateException("Got unexpected message
"+msg+" waiting for completion on "+this);
}
// Success
return;
}
} catch (Throwable t) {
Logger.error(this, "Caught "+t+" in "+this, t);
}
}
}
--- NEW FILE: ClientSubscription.java ---
package freenet.node;
import freenet.keys.ClientPublishStreamKey;
/**
* A client subscription.
*/
public interface ClientSubscription {
public ClientPublishStreamKey getKey();
public SubscriptionCallback getCallback();
public void disconnect();
}
--- NEW FILE: SubscriptionHandler.java ---
package freenet.node;
import java.util.LinkedList;
import freenet.keys.PublishStreamKey;
import freenet.support.Logger;
import freenet.support.NumberedItem;
import freenet.support.NumberedRecentItems;
/**
* A single subscription.
* May have a parent node, or may be the root.
* May have many child nodes.
*/
public class SubscriptionHandler {
static final int KEEP_PACKETS = 32;
final PublishStreamKey key;
ClientSubscriptionHandler localSubscribers;
final LinkedList subscriberPeers;
final NumberedRecentItems packets;
public SubscriptionHandler(PublishStreamKey k) {
key = k;
subscriberPeers = new LinkedList();
packets = new NumberedRecentItems(KEEP_PACKETS, true);
}
/**
* Set the local subscribers handler.
* @return The previous local subscribers handler - should be null!
*/
public synchronized ClientSubscriptionHandler
setLocal(ClientSubscriptionHandler csh) {
ClientSubscriptionHandler h = localSubscribers;
localSubscribers = csh;
return h;
}
public synchronized boolean shouldDrop() {
return (subscriberPeers.isEmpty() && localSubscribers == null);
}
/**
* Process an incoming PublishData packet.
*/
public void processPacket(long packetNumber, byte[] packetData, PeerNode
source) {
// First, have we seen it before?
if(!packets.add(new PacketItem(packetNumber, packetData))) {
Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+"
twice");
return;
}
// FIXME: redistribute it to node subscribers
// Redistribute it to local subscribers
localSubscribers.processPacket(packetNumber, packetData);
}
public class PacketItem implements NumberedItem {
long packetNumber;
byte[] data;
public PacketItem(long packetNumber, byte[] packetData) {
this.packetNumber = packetNumber;
data = packetData;
}
public long getNumber() {
return packetNumber;
}
}
}
Index: Node.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/Node.java,v
retrieving revision 1.47
retrieving revision 1.48
diff -u -w -r1.47 -r1.48
--- Node.java 7 Sep 2005 14:42:53 -0000 1.47
+++ Node.java 15 Sep 2005 18:16:04 -0000 1.48
@@ -22,7 +22,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
+import java.util.Hashtable;
import freenet.crypt.DiffieHellman;
import freenet.crypt.RandomSource;
@@ -31,6 +31,7 @@
import freenet.io.comm.DisconnectedException;
import freenet.io.comm.Message;
import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
import freenet.io.comm.Peer;
import freenet.io.comm.PeerParseException;
import freenet.io.comm.UdpSocketManager;
@@ -39,6 +40,7 @@
import freenet.keys.CHKVerifyException;
import freenet.keys.ClientCHK;
import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientPublishStreamKey;
import freenet.keys.NodeCHK;
import freenet.store.BaseFreenetStore;
import freenet.store.FreenetStore;
@@ -87,6 +89,11 @@
private final HashMap transferringRequestSenders;
/** InsertSender's currently running, by KeyHTLPair */
private final HashMap insertSenders;
+ /** Subscriptions */
+ final SubscriptionManager subscriptions;
+
+ /** Locally published stream contexts */
+ private final Hashtable localStreamContexts;
private final HashSet runningUIDs;
@@ -276,6 +283,7 @@
ps = new PacketSender(this);
peers = new PeerManager(this, prefix+"peers-"+portNumber);
+ subscriptions = new SubscriptionManager(this);
try {
usm = new UdpSocketManager(portNumber);
@@ -289,6 +297,7 @@
decrementAtMax = random.nextDouble() <= DECREMENT_AT_MAX_PROB;
decrementAtMin = random.nextDouble() <= DECREMENT_AT_MIN_PROB;
bootID = random.nextLong();
+ localStreamContexts = new Hashtable();
peers.writePeers();
}
@@ -688,4 +697,69 @@
myName = key;
writeNodeFile();
}
+
+ /** Create a publish stream and create context needed to insert data on
it. */
+ public ClientPublishStreamKey createPublishStream() {
+ ClientPublishStreamKey key =
+ ClientPublishStreamKey.createRandom(random);
+ makePublishContext(key);
+ return key;
+ }
+
+ private PublishContext makePublishContext(ClientPublishStreamKey key) {
+ synchronized(localStreamContexts) {
+ PublishContext ctx = (PublishContext) localStreamContexts.get(key);
+ if(ctx == null) {
+ // Which it usually does
+ ctx = new PublishContext(key, this);
+ localStreamContexts.put(key, ctx);
+ return ctx;
+ } else {
+ Logger.error(this, "Reusing existing publish context for
"+key);
+ // Reuse
+ return ctx;
+ }
+ }
+ }
+
+ public PublishHandlerSender makePublishHandlerSender(Message m) {
+ long id = m.getLong(DMT.UID);
+ Long lid = new Long(id);
+ boolean reject = false;
+ try {
+ synchronized(this) {
+ if(recentlyCompletedIDs.contains(lid))
+ reject = true;
+ else if(runningUIDs.contains(lid))
+ reject = true;
+ else runningUIDs.add(lid);
+ }
+ if(reject) {
+ try {
+ Message msg = DMT.createFNPRejectedLoop(id);
+ ((PeerNode)m.getSource()).sendAsync(msg, null);
+ } catch (NotConnectedException e) {
+ Logger.normal(this, "Lost connection refusing PublishData:
"+id);
+ }
+ return null;
+ } else {
+ return new PublishHandlerSender(m, this);
+ }
+ } catch (Throwable t) {
+ synchronized(this) {
+ runningUIDs.remove(lid);
+ recentlyCompletedIDs.remove(lid);
+ }
+ Logger.error(this, "Caught "+t+" handling PublishData "+id, t);
+ return null;
+ }
+ }
+
+ public void publish(ClientPublishStreamKey key, byte[] data) {
+ makePublishContext(key).publish(data);
+ }
+
+ public ClientSubscription subscribe(ClientPublishStreamKey key,
SubscriptionCallback cb) {
+ return subscriptions.localSubscribe(key, cb);
+ }
}
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/Version.java,v
retrieving revision 1.130
retrieving revision 1.131
diff -u -w -r1.130 -r1.131
--- Version.java 7 Sep 2005 18:40:56 -0000 1.130
+++ Version.java 15 Sep 2005 18:16:04 -0000 1.131
@@ -20,10 +20,10 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 135;
+ public static final int buildNumber = 136;
/** Oldest build of Fred we will talk to */
- public static final int lastGoodBuild = 131;
+ public static final int lastGoodBuild = 136;
/** The highest reported build of fred */
public static int highestSeenBuild = buildNumber;
Index: NodeDispatcher.java
===================================================================
RCS file:
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/NodeDispatcher.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -w -r1.22 -r1.23
--- NodeDispatcher.java 7 Sep 2005 15:22:16 -0000 1.22
+++ NodeDispatcher.java 15 Sep 2005 18:16:04 -0000 1.23
@@ -6,6 +6,7 @@
import freenet.io.comm.DMT;
import freenet.io.comm.Dispatcher;
import freenet.io.comm.Message;
+import freenet.io.comm.MessageType;
import freenet.io.comm.NotConnectedException;
import freenet.support.Logger;
@@ -48,45 +49,52 @@
}
return true;
}
- if(m.getSpec() == DMT.FNPLocChangeNotification) {
+ MessageType spec = m.getSpec();
+ if(spec == DMT.FNPLocChangeNotification) {
double newLoc = m.getDouble(DMT.LOCATION);
source.updateLocation(newLoc);
return true;
- }
- if(m.getSpec() == DMT.FNPSwapRequest) {
+ } else if(spec == DMT.FNPSwapRequest) {
return node.lm.handleSwapRequest(m);
- }
- if(m.getSpec() == DMT.FNPSwapReply) {
+ } else if(spec == DMT.FNPSwapReply) {
return node.lm.handleSwapReply(m);
- }
- if(m.getSpec() == DMT.FNPSwapRejected) {
+ } else if(spec == DMT.FNPSwapRejected) {
return node.lm.handleSwapRejected(m);
- }
- if(m.getSpec() == DMT.FNPSwapCommit) {
+ } else if(spec == DMT.FNPSwapCommit) {
return node.lm.handleSwapCommit(m);
- }
- if(m.getSpec() == DMT.FNPSwapComplete) {
+ } else if(spec == DMT.FNPSwapComplete) {
return node.lm.handleSwapComplete(m);
- }
- if(m.getSpec() == DMT.FNPRoutedPing) {
+ } else if(spec == DMT.FNPRoutedPing) {
return handleRouted(m);
- }
- if(m.getSpec() == DMT.FNPRoutedPong) {
+ } else if(spec == DMT.FNPRoutedPong) {
return handleRoutedReply(m);
- }
- if(m.getSpec() == DMT.FNPRoutedRejected) {
+ } else if(spec == DMT.FNPRoutedRejected) {
return handleRoutedRejected(m);
- }
- if(m.getSpec() == DMT.FNPDataRequest) {
+ } else if(spec == DMT.FNPDataRequest) {
return handleDataRequest(m);
- }
- if(m.getSpec() == DMT.FNPInsertRequest) {
+ } else if(spec == DMT.FNPInsertRequest) {
return handleInsertRequest(m);
+ } else if(spec == DMT.FNPPublishData) {
+ return handlePublishData(m);
}
+// } // SubscribeData, SubscribeRestarted etc handled by
SubscribeSender.
return false;
}
/**
+ * Handle an FNPPublishData message.
+ * @return False to put it back onto the queue.
+ */
+ private boolean handlePublishData(Message m) {
+ // Create a PublishSender. This will do all the work.
+ // FIXME maybe we should check whether we've sent the packet before?
+ // It's not really a viable DoS but it is good practice...
+ PublishHandlerSender ps =
+ node.makePublishHandlerSender(m);
+ return true;
+ }
+
+ /**
* Handle an incoming FNPDataRequest.
*/
private boolean handleDataRequest(Message m) {
Index: SimpleClient.java
===================================================================
RCS file:
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/SimpleClient.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -w -r1.1 -r1.2
--- SimpleClient.java 16 Jul 2005 12:00:34 -0000 1.1
+++ SimpleClient.java 15 Sep 2005 18:16:04 -0000 1.2
@@ -2,6 +2,7 @@
import freenet.keys.ClientCHK;
import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientPublishStreamKey;
/**
* @author amphibian
@@ -13,8 +14,32 @@
*/
public interface SimpleClient {
+ /**
+ * Fetch a key. Return null if cannot retrieve it.
+ */
public ClientCHKBlock getCHK(ClientCHK key);
+ /**
+ * Insert a key.
+ */
public void putCHK(ClientCHKBlock key);
+ /**
+ * Create a publish/subscribe stream key. Create any context
+ * needed to insert it.
+ */
+ public ClientPublishStreamKey createPublishStream();
+
+ /**
+ * Publish a block of data to a publish/subscribe key.
+ */
+ public void publish(ClientPublishStreamKey key, byte[] data);
+
+ /**
+ * Subscribe to a stream.
+ * @param key The stream key.
+ * @param cb Callback to notify when we get data.
+ * @return True if the subscribe succeeded.
+ */
+ public ClientSubscription subscribe(ClientPublishStreamKey key,
SubscriptionCallback cb);
}
Index: TextModeClientInterface.java
===================================================================
RCS file:
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/TextModeClientInterface.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -w -r1.22 -r1.23
--- TextModeClientInterface.java 7 Sep 2005 18:40:56 -0000 1.22
+++ TextModeClientInterface.java 15 Sep 2005 18:16:04 -0000 1.23
@@ -9,7 +9,9 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
+import java.util.Hashtable;
import freenet.crypt.RandomSource;
import freenet.io.comm.PeerParseException;
@@ -18,6 +20,7 @@
import freenet.keys.CHKEncodeException;
import freenet.keys.ClientCHK;
import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientPublishStreamKey;
import freenet.keys.FreenetURI;
import freenet.support.Logger;
import freenet.support.SimpleFieldSet;
@@ -31,12 +34,14 @@
*/
public class TextModeClientInterface implements Runnable {
- RandomSource r;
- Node n;
+ final RandomSource r;
+ final Node n;
+ final Hashtable streams;
TextModeClientInterface(Node n) {
this.n = n;
this.r = n.random;
+ streams = new Hashtable();
new Thread(this).start();
}
@@ -54,6 +59,9 @@
System.out.println("PUT:<text> - put a single line of text to a CHK
and return the key.");
System.out.println("PUTFILE:<filename> - put a file from disk.");
System.out.println("GETFILE:<filename> - fetch a key and put it in a
file. If the key includes a filename we will use it but we will not overwrite
local files.");
+ System.out.println("PUBLISH:<name> - create a publish/subscribe stream
called <name>");
+ System.out.println("PUSH:<name>:<text> - publish a single line of text
to the stream named");
+ System.out.println("SUBSCRIBE:<key> - subscribe to a publish/subscribe
stream by key");
System.out.println("CONNECT:<filename> - connect to a node from its
ref in a file.");
System.out.println("CONNECT:\n<noderef including an End on a line by
itself> - enter a noderef directly.");
System.out.println("NAME:<new node name> - change the node's name.");
@@ -75,7 +83,7 @@
/**
*
*/
- private void processLine(BufferedReader reader) {
+ private void processLine(BufferedReader reader) throws
UnsupportedEncodingException {
String line;
try {
line = reader.readLine();
@@ -84,7 +92,7 @@
return;
}
if(line == null) line = "QUIT";
- String uline = line.toLowerCase();
+ String uline = line.toUpperCase();
if(uline.startsWith("GET:")) {
// Should have a key next
String key = line.substring("GET:".length());
@@ -252,6 +260,53 @@
System.out.println("Threw: "+t);
t.printStackTrace();
}
+ } else if(line.startsWith("PUBLISH:")) {
+ line = line.substring("PUBLISH:".length());
+ line = line.trim();
+ System.out.println("Stream name: "+line);
+ ClientPublishStreamKey key = n.createPublishStream();
+ FreenetURI streamKey = key.getURI();
+ streamKey = streamKey.setDocName(line);
+ System.out.println("Stream key: "+streamKey);
+ streams.put(line, key);
+ } else if(line.startsWith("PUSH:")) {
+ // PUSH:<name>:<text>
+ line = line.substring("PUSH:".length());
+ line = line.trim();
+ int index = line.indexOf(':');
+ if(index == -1) {
+ System.err.println("What do you want me to publish?");
+ return;
+ }
+ String name = line.substring(0, index);
+ ClientPublishStreamKey key = (ClientPublishStreamKey)
streams.get(name);
+ if(key == null) {
+ System.err.println("Could not find stream called "+name);
+ } else {
+ String content;
+ if(line.length() > index+1) {
+ content = line.substring(index+1);
+ } else {
+ content = readLines(reader, false);
+ }
+ System.out.println("Publishing to "+key);
+ System.out.println("Data to publish:\n"+content);
+ n.publish(key, content.getBytes("UTF-8"));
+ }
+ } else if(uline.startsWith("SUBSCRIBE:")) {
+ line = line.substring("SUBSCRIBE:".length());
+ line = line.trim();
+ try {
+ FreenetURI uri = new FreenetURI(line);
+ ClientPublishStreamKey key = new ClientPublishStreamKey(uri);
+ n.subscribe(key, new MySubscriptionCallback(uri.getDocName()));
+ System.out.println("Subscribed.");
+ } catch (MalformedURLException e1) {
+ System.err.println("Invalid URI: "+e1.getMessage());
+ e1.printStackTrace();
+ return;
+ }
+
} else if(uline.startsWith("STATUS")) {
SimpleFieldSet fs = n.exportFieldSet();
System.out.println(fs.toString());
@@ -408,6 +463,33 @@
return sb.toString();
}
+ /**
+ *
+ * SubscriptionCallback that dumps output to stdout.
+ */
+ public class MySubscriptionCallback implements SubscriptionCallback {
+
+ final String name;
+
+ public MySubscriptionCallback(String string) {
+ name = string;
+ }
+
+ public void got(long packetNumber, byte[] data) {
+ System.out.println(name+":"+packetNumber+":"+new String(data));
+ }
+
+ public void lostConnection() {
+ System.out.println(name+":LOST CONNECTION");
+ }
+ public void restarted() {
+ System.out.println(name+":RESTARTED");
+ }
+
+ public void connected() {
+ System.out.println(name+":CONNECTED");
+ }
+ }
}