Update of /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv20948/src/freenet/node

Modified Files:
        Version.java SubscriptionHandler.java 
        TextModeClientInterface.java SubscriptionManager.java 
Log Message:
137:
Beginnings of routed subscribe support.
UI changes - especially SUBFILE.

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/Version.java,v
retrieving revision 1.131
retrieving revision 1.132
diff -u -w -r1.131 -r1.132
--- Version.java        15 Sep 2005 18:16:04 -0000      1.131
+++ Version.java        20 Sep 2005 18:19:04 -0000      1.132
@@ -20,7 +20,7 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 136;
+       public static final int buildNumber = 137;

        /** Oldest build of Fred we will talk to */
        public static final int lastGoodBuild = 136;

Index: SubscriptionHandler.java
===================================================================
RCS file: 
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/SubscriptionHandler.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -w -r1.1 -r1.2
--- SubscriptionHandler.java    15 Sep 2005 18:16:04 -0000      1.1
+++ SubscriptionHandler.java    20 Sep 2005 18:19:04 -0000      1.2
@@ -1,7 +1,5 @@
 package freenet.node;

-import java.util.LinkedList;
-
 import freenet.keys.PublishStreamKey;
 import freenet.support.Logger;
 import freenet.support.NumberedItem;
@@ -18,12 +16,12 @@

     final PublishStreamKey key;
     ClientSubscriptionHandler localSubscribers;
-    final LinkedList subscriberPeers;
+    PeerNode[] subscriberPeers;
     final NumberedRecentItems packets;

     public SubscriptionHandler(PublishStreamKey k) {
         key = k;
-        subscriberPeers = new LinkedList();
+        subscriberPeers = null;
         packets = new NumberedRecentItems(KEEP_PACKETS, true);
     }

@@ -38,7 +36,34 @@
     }

     public synchronized boolean shouldDrop() {
-        return (subscriberPeers.isEmpty() && localSubscribers == null);
+        return subscriberPeersEmpty() && localSubscribers == null;
+    }
+
+    /**
+     * @return True if there are no subscriber peers.
+     */
+    private final boolean subscriberPeersEmpty() {
+        PeerNode[] peers = subscriberPeers;
+        return (peers == null || peers.length == 0);
+    }
+
+    public void addSubscriberNode(PeerNode pn, long lastSeen) {
+        synchronized(this) {
+            if(subscriberPeers == null || subscriberPeers.length == 0) {
+                subscriberPeers = new PeerNode[] { pn };
+            } else {
+                PeerNode[] peers = new PeerNode[subscriberPeers.length+1];
+                System.arraycopy(subscriberPeers, 0, peers, 0, 
subscriberPeers.length);
+                peers[peers.length-1] = pn;
+                subscriberPeers = peers;
+            }
+        }
+        NumberedItem[] items = packets.getAfter(lastSeen);
+        if(items == null || items.length == 0) return;
+        for(int i=0;i<items.length;i++) {
+            PacketItem item = (PacketItem)items[i];
+            item.forwardTo(pn);
+        }
     }

     /**
@@ -46,11 +71,21 @@
      */
     public void processPacket(long packetNumber, byte[] packetData, PeerNode 
source) {
         // First, have we seen it before?
-        if(!packets.add(new PacketItem(packetNumber, packetData))) {
+        PacketItem item = new PacketItem(packetNumber, packetData);
+        if(!packets.add(item)) {
             Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+" 
twice");
             return;
         }
-        // FIXME: redistribute it to node subscribers
+        PeerNode[] peers;
+        // We don't strictly need to synchronize, but
+        // if we don't we may lose packets.
+        synchronized(this) {
+            peers = subscriberPeers;
+        }
+        if(peers != null)
+            for(int i=0;i<peers.length;i++)
+                item.forwardTo(peers[i]);
+        
         // Redistribute it to local subscribers
         localSubscribers.processPacket(packetNumber, packetData);
     }
@@ -65,6 +100,18 @@
             data = packetData;
         }

+        /**
+         * Forward this packet to a subscriber node.
+         * As an FNPSubscribeData. NOT an FNPPublishData.
+         * DO NOT CALL WITH LOCKS HELD, as sends packets.
+         * @param pn Node to send to.
+         */
+        public void forwardTo(PeerNode pn) {
+            
+            // TODO Auto-generated method stub
+            
+        }
+
         public long getNumber() {
             return packetNumber;
         }

Index: TextModeClientInterface.java
===================================================================
RCS file: 
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/TextModeClientInterface.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -w -r1.23 -r1.24
--- TextModeClientInterface.java        15 Sep 2005 18:16:04 -0000      1.23
+++ TextModeClientInterface.java        20 Sep 2005 18:19:04 -0000      1.24
@@ -9,7 +9,9 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.io.UnsupportedEncodingException;
+import java.io.Writer;
 import java.net.MalformedURLException;
 import java.util.Hashtable;

@@ -37,11 +39,13 @@
     final RandomSource r;
     final Node n;
     final Hashtable streams;
+    private Writer subscribedDataStream;

     TextModeClientInterface(Node n) {
         this.n = n;
         this.r = n.random;
         streams = new Hashtable();
+        subscribedDataStream = new OutputStreamWriter(System.out);
         new Thread(this).start();
     }

@@ -65,6 +69,7 @@
         System.out.println("CONNECT:<filename> - connect to a node from its 
ref in a file.");
         System.out.println("CONNECT:\n<noderef including an End on a line by 
itself> - enter a noderef directly.");
         System.out.println("NAME:<new node name> - change the node's name.");
+        System.out.println("SUBFILE:<filename> - append all data received from 
subscriptions to a file, rather than sending it to stdout.");
         System.out.println("STATUS - display some status information on the 
node including its reference and connections.");
         System.out.println("QUIT - exit the program");
         // Read command, and data
@@ -93,6 +98,7 @@
         }
         if(line == null) line = "QUIT";
         String uline = line.toUpperCase();
+        Logger.minor(this, "Command: "+line);
         if(uline.startsWith("GET:")) {
             // Should have a key next
             String key = line.substring("GET:".length());
@@ -300,7 +306,7 @@
                 FreenetURI uri = new FreenetURI(line);
                 ClientPublishStreamKey key = new ClientPublishStreamKey(uri);
                 n.subscribe(key, new MySubscriptionCallback(uri.getDocName()));
-                System.out.println("Subscribed.");
+                System.out.println("Subscribed to "+uri.getDocName()+".");
             } catch (MalformedURLException e1) {
                 System.err.println("Invalid URI: "+e1.getMessage());
                 e1.printStackTrace();
@@ -341,6 +347,20 @@
                 if(content.equals("")) return;
                 connect(content);
             }
+        } else if(uline.startsWith("SUBFILE:")) {
+            String filename = line.substring("SUBFILE:".length()).trim();
+            System.out.println("Writing all received subscription data to 
"+filename);
+            try {
+                FileOutputStream fos = new FileOutputStream(filename, true);
+                OutputStreamWriter w = new OutputStreamWriter(fos);
+                // Test it
+                w.write("Opened at "+System.currentTimeMillis()+" for writing 
subscribed data\n");
+                subscribedDataStream = w;
+                w.flush();
+            } catch (IOException e) {
+                System.err.println("Could not use file: "+e.getMessage());
+            }
+            
         } else if(uline.startsWith("NAME:")) {
             System.out.println("Node name currently: "+n.myName);
             String key = line.substring("NAME:".length());
@@ -466,6 +486,7 @@
     /**
      * 
      * SubscriptionCallback that dumps output to stdout.
+     * FIXME this might block if stdout is redirected and disk is full...
      */
     public class MySubscriptionCallback implements SubscriptionCallback {

@@ -476,19 +497,47 @@
         }

         public void got(long packetNumber, byte[] data) {
-            System.out.println(name+":"+packetNumber+":"+new String(data));
+            try {
+                subscribedDataStream.write(name+":"+packetNumber+":"+new 
String(data)+"\n");
+                subscribedDataStream.flush();
+            } catch (IOException e) {
+                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
+                Logger.error(this, s);
+                System.err.println(s);
+            }
         }

         public void lostConnection() {
-            System.out.println(name+":LOST CONNECTION");
+            try {
+                subscribedDataStream.write(name+":LOST CONNECTION\n");
+                subscribedDataStream.flush();
+            } catch (IOException e) {
+                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
+                Logger.error(this, s);
+                System.err.println(s);
+            }
         }

         public void restarted() {
-            System.out.println(name+":RESTARTED");
+            try {
+                subscribedDataStream.write(name+":RESTARTED\n");
+                subscribedDataStream.flush();
+            } catch (IOException e) {
+                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
+                Logger.error(this, s);
+                System.err.println(s);
+            }
         }

         public void connected() {
-            System.out.println(name+":CONNECTED");
+            try {
+                subscribedDataStream.write(name+":CONNECTED\n");
+                subscribedDataStream.flush();
+            } catch (IOException e) {
+                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
+                Logger.error(this, s);
+                System.err.println(s);
+            }
         }
     }


Index: SubscriptionManager.java
===================================================================
RCS file: 
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/SubscriptionManager.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -w -r1.1 -r1.2
--- SubscriptionManager.java    15 Sep 2005 18:16:04 -0000      1.1
+++ SubscriptionManager.java    20 Sep 2005 18:19:04 -0000      1.2
@@ -47,6 +47,7 @@
         // Each pretends it is the root
         ClientSubscriptionHandler csh;
         boolean add = false;
+        Logger.minor(this, "Subscribing locally to "+key);
         synchronized(this) {
             csh = (ClientSubscriptionHandler) 
clientSubscriptionsByKey.get(key);
             if(csh == null) {
@@ -112,7 +113,7 @@
             sub = (SubscriptionHandler) subscriptionsByKey.get(key);
         }
         if(sub == null) {
-            Logger.normal(this, "Dropped sub packet from "+source+" on "+key);
+            Logger.normal(this, "Dropped sub packet from "+source+" on "+key+" 
- not subscribed");
             return;
         }
         sub.processPacket(packetNumber, packetData, source);


Reply via email to