Update of /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv20948/src/freenet/node
Modified Files:
Version.java SubscriptionHandler.java
TextModeClientInterface.java SubscriptionManager.java
Log Message:
137:
Beginnings of routed subscribe support.
UI changes - especially SUBFILE.
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/Version.java,v
retrieving revision 1.131
retrieving revision 1.132
diff -u -w -r1.131 -r1.132
--- Version.java 15 Sep 2005 18:16:04 -0000 1.131
+++ Version.java 20 Sep 2005 18:19:04 -0000 1.132
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 136;
+ public static final int buildNumber = 137;
/** Oldest build of Fred we will talk to */
public static final int lastGoodBuild = 136;
Index: SubscriptionHandler.java
===================================================================
RCS file:
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/SubscriptionHandler.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -w -r1.1 -r1.2
--- SubscriptionHandler.java 15 Sep 2005 18:16:04 -0000 1.1
+++ SubscriptionHandler.java 20 Sep 2005 18:19:04 -0000 1.2
@@ -1,7 +1,5 @@
package freenet.node;
-import java.util.LinkedList;
-
import freenet.keys.PublishStreamKey;
import freenet.support.Logger;
import freenet.support.NumberedItem;
@@ -18,12 +16,12 @@
final PublishStreamKey key;
ClientSubscriptionHandler localSubscribers;
- final LinkedList subscriberPeers;
+ PeerNode[] subscriberPeers;
final NumberedRecentItems packets;
public SubscriptionHandler(PublishStreamKey k) {
key = k;
- subscriberPeers = new LinkedList();
+ subscriberPeers = null;
packets = new NumberedRecentItems(KEEP_PACKETS, true);
}
@@ -38,7 +36,34 @@
}
public synchronized boolean shouldDrop() {
- return (subscriberPeers.isEmpty() && localSubscribers == null);
+ return subscriberPeersEmpty() && localSubscribers == null;
+ }
+
+ /**
+ * @return True if there are no subscriber peers.
+ */
+ private final boolean subscriberPeersEmpty() {
+ PeerNode[] peers = subscriberPeers;
+ return (peers == null || peers.length == 0);
+ }
+
+ public void addSubscriberNode(PeerNode pn, long lastSeen) {
+ synchronized(this) {
+ if(subscriberPeers == null || subscriberPeers.length == 0) {
+ subscriberPeers = new PeerNode[] { pn };
+ } else {
+ PeerNode[] peers = new PeerNode[subscriberPeers.length+1];
+ System.arraycopy(subscriberPeers, 0, peers, 0,
subscriberPeers.length);
+ peers[peers.length-1] = pn;
+ subscriberPeers = peers;
+ }
+ }
+ NumberedItem[] items = packets.getAfter(lastSeen);
+ if(items == null || items.length == 0) return;
+ for(int i=0;i<items.length;i++) {
+ PacketItem item = (PacketItem)items[i];
+ item.forwardTo(pn);
+ }
}
/**
@@ -46,11 +71,21 @@
*/
public void processPacket(long packetNumber, byte[] packetData, PeerNode
source) {
// First, have we seen it before?
- if(!packets.add(new PacketItem(packetNumber, packetData))) {
+ PacketItem item = new PacketItem(packetNumber, packetData);
+ if(!packets.add(item)) {
Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+"
twice");
return;
}
- // FIXME: redistribute it to node subscribers
+ PeerNode[] peers;
+ // We don't strictly need to synchronize, but
+ // if we don't we may lose packets.
+ synchronized(this) {
+ peers = subscriberPeers;
+ }
+ if(peers != null)
+ for(int i=0;i<peers.length;i++)
+ item.forwardTo(peers[i]);
+
// Redistribute it to local subscribers
localSubscribers.processPacket(packetNumber, packetData);
}
@@ -65,6 +100,18 @@
data = packetData;
}
+ /**
+ * Forward this packet to a subscriber node.
+ * As an FNPSubscribeData. NOT an FNPPublishData.
+ * DO NOT CALL WITH LOCKS HELD, as sends packets.
+ * @param pn Node to send to.
+ */
+ public void forwardTo(PeerNode pn) {
+
+ // TODO Auto-generated method stub
+
+ }
+
public long getNumber() {
return packetNumber;
}
Index: TextModeClientInterface.java
===================================================================
RCS file:
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/TextModeClientInterface.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -w -r1.23 -r1.24
--- TextModeClientInterface.java 15 Sep 2005 18:16:04 -0000 1.23
+++ TextModeClientInterface.java 20 Sep 2005 18:19:04 -0000 1.24
@@ -9,7 +9,9 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
+import java.io.Writer;
import java.net.MalformedURLException;
import java.util.Hashtable;
@@ -37,11 +39,13 @@
final RandomSource r;
final Node n;
final Hashtable streams;
+ private Writer subscribedDataStream;
TextModeClientInterface(Node n) {
this.n = n;
this.r = n.random;
streams = new Hashtable();
+ subscribedDataStream = new OutputStreamWriter(System.out);
new Thread(this).start();
}
@@ -65,6 +69,7 @@
System.out.println("CONNECT:<filename> - connect to a node from its
ref in a file.");
System.out.println("CONNECT:\n<noderef including an End on a line by
itself> - enter a noderef directly.");
System.out.println("NAME:<new node name> - change the node's name.");
+ System.out.println("SUBFILE:<filename> - append all data received from
subscriptions to a file, rather than sending it to stdout.");
System.out.println("STATUS - display some status information on the
node including its reference and connections.");
System.out.println("QUIT - exit the program");
// Read command, and data
@@ -93,6 +98,7 @@
}
if(line == null) line = "QUIT";
String uline = line.toUpperCase();
+ Logger.minor(this, "Command: "+line);
if(uline.startsWith("GET:")) {
// Should have a key next
String key = line.substring("GET:".length());
@@ -300,7 +306,7 @@
FreenetURI uri = new FreenetURI(line);
ClientPublishStreamKey key = new ClientPublishStreamKey(uri);
n.subscribe(key, new MySubscriptionCallback(uri.getDocName()));
- System.out.println("Subscribed.");
+ System.out.println("Subscribed to "+uri.getDocName()+".");
} catch (MalformedURLException e1) {
System.err.println("Invalid URI: "+e1.getMessage());
e1.printStackTrace();
@@ -341,6 +347,20 @@
if(content.equals("")) return;
connect(content);
}
+ } else if(uline.startsWith("SUBFILE:")) {
+ String filename = line.substring("SUBFILE:".length()).trim();
+ System.out.println("Writing all received subscription data to
"+filename);
+ try {
+ FileOutputStream fos = new FileOutputStream(filename, true);
+ OutputStreamWriter w = new OutputStreamWriter(fos);
+ // Test it
+ w.write("Opened at "+System.currentTimeMillis()+" for writing
subscribed data\n");
+ subscribedDataStream = w;
+ w.flush();
+ } catch (IOException e) {
+ System.err.println("Could not use file: "+e.getMessage());
+ }
+
} else if(uline.startsWith("NAME:")) {
System.out.println("Node name currently: "+n.myName);
String key = line.substring("NAME:".length());
@@ -466,6 +486,7 @@
/**
*
* SubscriptionCallback that dumps output to stdout.
+ * FIXME this might block if stdout is redirected and disk is full...
*/
public class MySubscriptionCallback implements SubscriptionCallback {
@@ -476,19 +497,47 @@
}
public void got(long packetNumber, byte[] data) {
- System.out.println(name+":"+packetNumber+":"+new String(data));
+ try {
+ subscribedDataStream.write(name+":"+packetNumber+":"+new
String(data)+"\n");
+ subscribedDataStream.flush();
+ } catch (IOException e) {
+ String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
+ Logger.error(this, s);
+ System.err.println(s);
+ }
}
public void lostConnection() {
- System.out.println(name+":LOST CONNECTION");
+ try {
+ subscribedDataStream.write(name+":LOST CONNECTION\n");
+ subscribedDataStream.flush();
+ } catch (IOException e) {
+ String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
+ Logger.error(this, s);
+ System.err.println(s);
+ }
}
public void restarted() {
- System.out.println(name+":RESTARTED");
+ try {
+ subscribedDataStream.write(name+":RESTARTED\n");
+ subscribedDataStream.flush();
+ } catch (IOException e) {
+ String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
+ Logger.error(this, s);
+ System.err.println(s);
+ }
}
public void connected() {
- System.out.println(name+":CONNECTED");
+ try {
+ subscribedDataStream.write(name+":CONNECTED\n");
+ subscribedDataStream.flush();
+ } catch (IOException e) {
+ String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
+ Logger.error(this, s);
+ System.err.println(s);
+ }
}
}
Index: SubscriptionManager.java
===================================================================
RCS file:
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/SubscriptionManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -w -r1.1 -r1.2
--- SubscriptionManager.java 15 Sep 2005 18:16:04 -0000 1.1
+++ SubscriptionManager.java 20 Sep 2005 18:19:04 -0000 1.2
@@ -47,6 +47,7 @@
// Each pretends it is the root
ClientSubscriptionHandler csh;
boolean add = false;
+ Logger.minor(this, "Subscribing locally to "+key);
synchronized(this) {
csh = (ClientSubscriptionHandler)
clientSubscriptionsByKey.get(key);
if(csh == null) {
@@ -112,7 +113,7 @@
sub = (SubscriptionHandler) subscriptionsByKey.get(key);
}
if(sub == null) {
- Logger.normal(this, "Dropped sub packet from "+source+" on "+key);
+ Logger.normal(this, "Dropped sub packet from "+source+" on "+key+"
- not subscribed");
return;
}
sub.processPacket(packetNumber, packetData, source);