Update of /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv7319/src/freenet/node

Modified Files:
        Node.java Version.java NodeDispatcher.java SimpleClient.java 
        TextModeClientInterface.java 
Added Files:
        SubscriptionCallback.java PublishContext.java 
        ClientSubscriptionImpl.java SubscriptionManager.java 
        ClientSubscriptionHandler.java PublishHandlerSender.java 
        ClientSubscription.java SubscriptionHandler.java 
Log Message:
Build 136:
Initial publish/subscribe support.
Publish works. Subscribe requests are not yet routed; we just subscribe 
locally, if the stream happens to pass through us.

--- NEW FILE: SubscriptionCallback.java ---
package freenet.node;

/**
 * Publish/Subscribe subscriber callback.
 */
public interface SubscriptionCallback {

    void got(long packetNumber, byte[] data);

    void lostConnection();

    void restarted();

    void connected();

}

--- NEW FILE: PublishContext.java ---
package freenet.node;

import freenet.keys.ClientPublishStreamKey;

/**
 * Context for a locally originated publish stream.
 */
public class PublishContext {

    final ClientPublishStreamKey key;
    final Node node;
    /** Last packet number. Incremented each time, modulo 2^63-1 */
    private long lastPacketNumber;

    /**
     * @param key
     */
    public PublishContext(ClientPublishStreamKey key, Node node) {
        this.key = key;
        this.node = node;
        lastPacketNumber = Math.abs(node.random.nextLong());
    }

    /**
     * Publish a block of data to the stream. Must not exceed
     * size limit.
     */
    public void publish(byte[] data) {
        long packetNumber;
        synchronized(this) {
            packetNumber = lastPacketNumber;
            long next = lastPacketNumber+1;
            if(next < 0) next = 0;
            lastPacketNumber = next;
        }
        byte[] encrypted = 
            key.encrypt(packetNumber, data, node.random);
        PublishHandlerSender ps = new PublishHandlerSender(key.getKey(), 
node.random.nextLong(), packetNumber, encrypted, node, null, -1.0);
    }

}

--- NEW FILE: ClientSubscriptionImpl.java ---
package freenet.node;

import freenet.keys.ClientPublishStreamKey;
import freenet.support.Logger;

public class ClientSubscriptionImpl implements ClientSubscription {

    private ClientPublishStreamKey key;
    private SubscriptionCallback cb;
    private ClientSubscriptionHandler ch;
    private boolean finished = false;

    /**
     * @param key2
     * @param cb2
     * @param csh
     */
    public ClientSubscriptionImpl(ClientPublishStreamKey key2, 
SubscriptionCallback cb2, ClientSubscriptionHandler csh) {
        cb = cb2;
        key = key2;
        ch = csh;
    }

    public ClientPublishStreamKey getKey() {
        return key;
    }

    public SubscriptionCallback getCallback() {
        return cb;
    }

    public void disconnect() {
        finished = true;
        ch.remove(this);
    }

    /**
     * Received decrypted data from the stream.
     */
    public void processPacket(long packetNumber, byte[] finalData) {
        try {
            cb.got(packetNumber, finalData);
        } catch (Throwable t) {
            Logger.error(this, "Caught "+t+" from callback in processPacket("+
                    packetNumber+") on "+this, t);
        }
    }

}

--- NEW FILE: SubscriptionManager.java ---
package freenet.node;

import java.util.HashMap;

import freenet.keys.ClientPublishStreamKey;
import freenet.keys.PublishStreamKey;
import freenet.support.Logger;

/**
 * Tracks Publish/Subscribe streams:
 * - Local subscriptions.
 * - Remote subscriptions (other nodes subscribing to a stream through us).
 * - Whether we are root for a given stream.
 * - Nodes that we subscribe through to get a given stream (i.e. our tree 
parent).
 */
public class SubscriptionManager {

    // We have up to 32 subscriptions
    private final int MAX_COUNT = 32;

    private final Node node;
    /** map: key -> sub. definitively has all subs. */
    private final HashMap subscriptionsByKey;
//    /** map: parent node -> linkedlist of subs. does not include parent==null 
i.e. parent==this. */
//    private final HashMap subscriptionsByParent;
//    /** map: child node -> linkedlist of subs. some subs may have no children 
so won't be included. */
//    private final HashMap subscriptionsByChildren;
    /** Client subscriptions. These are the ClientSubscriptionHandler's. */
    private final HashMap clientSubscriptionsByKey;

    SubscriptionManager(Node n) {
        node = n;
        subscriptionsByKey = new HashMap();
//        subscriptionsByParent = new HashMap();
//        subscriptionsByChildren = new HashMap();
        clientSubscriptionsByKey = new HashMap();
    }

    /**
     * Local subscription.
     * Add the stream if necessary, and subscribe to it.
     * @return Null if cannot subscribe, otherwise the ClientSubscription.
     */
    public ClientSubscription localSubscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb) {
        // FIXME implement doing a sub request
        // For now we will just eavesdrop on locally passed subs.
        // Each pretends it is the root
        ClientSubscriptionHandler csh;
        boolean add = false;
        synchronized(this) {
            csh = (ClientSubscriptionHandler) clientSubscriptionsByKey.get(key);
            if(csh == null) {
                csh = new ClientSubscriptionHandler(this, key);
                add = true;
                clientSubscriptionsByKey.put(key, csh);
            }
        }
        if(add) {
            Logger.minor(this, "New subscription to "+key);
            SubscriptionHandler sub = makeSubscription(key.getKey());
            if(sub.setLocal(csh) != null) {
                Logger.error(this, "Had local already! for "+key);
            }
        }
        ClientSubscriptionImpl csi = new ClientSubscriptionImpl(key, cb, csh);
        csh.add(csi);
        // FIXME implement the rest - especially sending sub reqs out
        return csi;
    }

    /**
     * Create a back-end subscription. This can be called
     * as a result of a subscribe request or of a local
     * subscription.
     */
    private synchronized SubscriptionHandler makeSubscription(PublishStreamKey 
key) {
        SubscriptionHandler sub = (SubscriptionHandler) 
subscriptionsByKey.get(key);
        if(sub != null) return sub;
        if(subscriptionsByKey.size() >= MAX_COUNT) {
            Logger.normal(this, "Rejecting subscription for "+key);
            return null;
        }

        // Make a new one
        sub = new SubscriptionHandler(key);
        subscriptionsByKey.put(key, sub);
        return sub;
    }

    public synchronized void remove(ClientSubscriptionHandler handler) {
        PublishStreamKey key = handler.getKey().getKey();
        SubscriptionHandler sub = (SubscriptionHandler) 
subscriptionsByKey.get(key);
        ClientSubscriptionHandler oldHandler = sub.setLocal(handler);
        if(oldHandler != null && oldHandler != handler) {
            Logger.error(this, "Already had a different handler: "+oldHandler+" 
should be "+handler);
        }
        if(sub.shouldDrop()) {
            drop(sub, key);
        }
    }

    private synchronized void drop(SubscriptionHandler sub, PublishStreamKey 
key) {
        subscriptionsByKey.remove(key);
    }

    /**
     * Handle a received packet.
     */
    public void receivedPacket(PublishStreamKey key, long packetNumber, byte[] 
packetData, PeerNode source) {
        SubscriptionHandler sub;
        synchronized(this) {
            sub = (SubscriptionHandler) subscriptionsByKey.get(key);
        }
        if(sub == null) {
            Logger.normal(this, "Dropped sub packet from "+source+" on "+key);
            return;
        }
        sub.processPacket(packetNumber, packetData, source);
    }
}

--- NEW FILE: ClientSubscriptionHandler.java ---
package freenet.node;

import java.util.Iterator;
import java.util.LinkedList;

import freenet.keys.ClientPublishStreamKey;
import freenet.support.Logger;

/**
 * Contains a list of ClientSubscriptionImpl's.
 * When they all disconnect, we unregister.
 */
public class ClientSubscriptionHandler {


    final ClientPublishStreamKey key;
    final SubscriptionManager sm;
    boolean finished = false;
    private final LinkedList clientSubs;

    ClientSubscriptionHandler(SubscriptionManager manager, 
ClientPublishStreamKey key) {
        clientSubs = new LinkedList();
        sm = manager;
        this.key = key;
    }

    synchronized void add(ClientSubscriptionImpl sub) {
        if(finished)
            throw new IllegalArgumentException();
        clientSubs.add(sub);
    }

    synchronized void remove(ClientSubscriptionImpl sub) {
        clientSubs.remove(sub);
        if(clientSubs.size() == 0) {
            finished = true;
            sm.remove(this);
        }
    }

    public ClientPublishStreamKey getKey() {
        return key;
    }

    /**
     * Received data!
     */
    public void processPacket(long packetNumber, byte[] packetData) {
        byte[] finalData = key.decrypt(packetNumber, packetData);
        if(finalData == null) {
            Logger.error(this, "Packet did not decrypt");
            return;
        }
        for(Iterator i=clientSubs.iterator();i.hasNext();) {
            ClientSubscriptionImpl impl = (ClientSubscriptionImpl)i.next();
            impl.processPacket(packetNumber, finalData);
        }
    }
}

--- NEW FILE: PublishHandlerSender.java ---
package freenet.node;

import java.util.HashSet;

import freenet.io.comm.DMT;
import freenet.io.comm.DisconnectedException;
import freenet.io.comm.Message;
import freenet.io.comm.MessageFilter;
import freenet.io.comm.NotConnectedException;
import freenet.keys.PublishStreamKey;
import freenet.support.Fields;
import freenet.support.Logger;
import freenet.support.ShortBuffer;

/**
 * Forwards a PublishData message. Routed, exactly as an insert.
 * Also communicates with the previous hop. For inserts and requests
 * we split this into two - Handler and Sender - but for this it is
 * simpler to have one, especially as there is no coalescing.
 */
public class PublishHandlerSender implements Runnable {

    // Basics
    final PublishStreamKey key;
    final double target;
    final long uid;
    private short htl;
    final PeerNode source;
    final Node node;
    final long packetNumber;
    final byte[] packetData;
    final Thread t;
    private double closestLocation;

    // Constants - FIXME way too high?
    static final int ACCEPTED_TIMEOUT = 10000;
    static final int COMPLETE_TIMEOUT = 120000;

    public String toString() {
        return super.toString()+": id="+uid+", source="+source+", 
packetNo="+packetNumber+", data="+Fields.hashCode(packetData);
    }


    /**
     * Create a PublishHandlerSender.
     * closestLocation must be reset by the caller if we are closer to target
     * than the node sending the request.
     */
    public PublishHandlerSender(PublishStreamKey key2, long id, long packetNo, 
byte[] data, Node n, PeerNode src, double closestLoc) {
        node = n;
        key = key2;
        target = key.toNormalizedDouble();
        uid = id;
        packetNumber = packetNo;
        packetData = data;
        source = src;
        closestLocation = closestLoc;
        htl = Node.MAX_HTL;
        t = new Thread(this);
        t.setDaemon(true);
        t.start();
    }

    /**
     * Create from an incoming Message. Does not check
     * UID/key for collision; this must be done by caller.
     * Will do everything else though, including resetting
     * the closestLocation if necessary.
     */
    public PublishHandlerSender(Message m, Node n) {
        this.node = n;
        key = (PublishStreamKey) m.getObject(DMT.KEY);
        target = key.toNormalizedDouble();
        uid = m.getLong(DMT.UID);
        packetNumber = m.getLong(DMT.STREAM_SEQNO);
        source = (PeerNode) m.getSource();
        packetData = ((ShortBuffer)m.getObject(DMT.DATA)).getData();
        closestLocation = m.getDouble(DMT.NEAREST_LOCATION);
        htl = m.getShort(DMT.HTL);
        double myLoc = n.lm.getLocation().getValue();
        if(Math.abs(myLoc - target) < Math.abs(closestLocation - target))
            closestLocation = myLoc;
        t = new Thread(this);
        t.setDaemon(true);
        t.start();
    }

    /** Very similar to InsertSender.run() */
    public void run() {
        Logger.minor(this, "Running "+this);
        try {
            // Copy it as it will be decrypted in place
            byte[] packetDataCopy = new byte[packetData.length];
            System.arraycopy(packetData, 0, packetDataCopy, 0, 
packetData.length);
            node.subscriptions.receivedPacket(key, packetNumber, 
packetDataCopy, source);
            short origHTL = htl;
            if(source != null) {
                Message msg = DMT.createFNPAccepted(uid);
                try {
                    source.sendAsync(msg, null);
                } catch (NotConnectedException e) {
                    Logger.normal(this, "Not connected sending Accepted on 
"+this);
                }
            }

            HashSet nodesRoutedTo = new HashSet();

            // Don't check whether source is connected; forward anyway

            while(true) {
                if(htl == 0) {
                    // Ran out of hops; success
                    if(source != null) {
                        Message msg = DMT.createFNPPublishDataSucceeded(uid);
                        try {
                            source.sendAsync(msg, null);
                        } catch (NotConnectedException e) {
                            Logger.minor(this, "Not connected sending 
FNPPublishDataSucceeded on "+this);
                        }
                    }
                    return;
                }

                // Route
                PeerNode next;

                // Can backtrack, so only route to nodes closer than we are to 
target.
                double nextValue;
                synchronized(node.peers) {
                    next = node.peers.closerPeer(source, nodesRoutedTo, target, 
true);
                    if(next != null)
                        nextValue = next.getLocation().getValue();
                    else
                        nextValue = -1.0;
                }

                if(next == null) {
                    // Backtrack
                    Logger.minor(this, "No more routes");
                    if(source != null) {
                        Message msg = DMT.createFNPRouteNotFound(uid, htl);
                        try {
                            source.sendAsync(msg, null);
                        } catch (NotConnectedException e) {
                            Logger.normal(this, "Not connected when RNFing on 
"+this);
                        }
                    } else {
                        Logger.minor(this, "RNF on "+this);
                    }
                    return;
                }
                Logger.minor(this, "Routing insert to "+next);
                nodesRoutedTo.add(next);

                if(Math.abs(target - nextValue) > Math.abs(target - 
closestLocation)) {
                    Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
                    htl = node.decrementHTL(source, htl);
                }

                // Now we send it
                /**
                 * Possible responses:
                 * Accepted
                 * RejectedOverload
                 * RejectedLoop
                 * (timeout)
                 */
                Message msg = DMT.createFNPPublishData(htl, packetData, key, 
packetNumber, uid, closestLocation);

                MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
                MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
                MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
                MessageFilter mfPublishDataInvalid = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPPublishDataInvalid);

                // mfRejectedOverload must be the last thing in the or
                // So its or pointer remains null
                // Otherwise we need to recreate it below
                MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload.or(mfPublishDataInvalid)));
                mfRejectedOverload.clearOr();

                try {
                    next.send(msg);
                } catch (NotConnectedException e) {
                    Logger.minor(this, "Not connected to "+next+" - skipping");
                    continue;
                }

                try {
                    msg = null;
                    msg = node.usm.waitFor(mf);
                } catch (DisconnectedException e1) {
                    Logger.minor(this, "Not connected to "+next+" while 
waiting, skipping");
                    continue;
                }

                if(msg == null) {
                    // Timeout waiting for Accepted
                    Logger.normal(this, "Timed out waiting for Accepted on 
"+this+" from "+next);
                    continue;
                }

                if(msg.getSpec() == DMT.FNPRejectedLoop) {
                    Logger.minor(this, "Rejected: loop for "+this);
                    continue;
                }

                if(msg.getSpec() == DMT.FNPRejectedOverload) {
                    // Propagate back to source; fatal
                    if(source != null) {
                        Message m = DMT.createFNPRejectedOverload(uid);
                        try {
                            source.sendAsync(m,null);
                        } catch (NotConnectedException e2) {
                            Logger.normal(this, "Source disconnected relaying 
rejected:overload from "+next+" for "+this);
                        }
                    } else {
                        Logger.normal(this, "FNPDataPublish rejected: overload 
from "+next+" for "+this);
                    }
                    return;
                }

                if(msg.getSpec() != DMT.FNPAccepted) {
                    throw new IllegalStateException("Unrecognized message: 
"+msg+" while waiting for Accepted on "+this);
                }

                // Got an Accepted; wait for a success

                MessageFilter mfSucceeded = 
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
 uid).setType(DMT.FNPPublishDataSucceeded);
                MessageFilter mfRNF = 
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
 uid).setType(DMT.FNPRouteNotFound);
                mfRejectedOverload.clearOr();
                mfPublishDataInvalid.clearOr();

                mf = 
mfSucceeded.or(mfRNF.or(mfRejectedOverload.or(mfPublishDataInvalid)));

                msg = null;
                try {
                    node.usm.waitFor(mf);
                } catch (DisconnectedException e3) {
                    Logger.error(this, "Disconnected from next: "+next+" while 
waiting for completion on "+this);
                    continue; // can cause load multiplication, hence the 
error; what else are we supposed to do though?
                }

                if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
                    // Timeout
                    Logger.error(this, "Timeout waiting for completion of 
"+this);
                    // Propagate back to source; fatal
                    if(source != null) {
                        Message m = DMT.createFNPRejectedOverload(uid);
                        try {
                            source.sendAsync(m,null);
                        } catch (NotConnectedException e2) {
                            Logger.normal(this, "Source disconnected relaying 
rejected:overload from "+next+" for "+this);
                        }
                    } else {
                        Logger.normal(this, "FNPDataPublish rejected: overload 
from "+next+" for "+this);
                    }
                    return;
                }

                if(msg.getSpec() == DMT.FNPRouteNotFound) {
                    Logger.minor(this, "Rejected: RNF");
                    // Still gets the data - but not yet
                    short newHtl = msg.getShort(DMT.HTL);
                    if(htl > newHtl) htl = newHtl;
                    continue;

                }

                if(msg.getSpec() == DMT.FNPPublishDataInvalid) {
                    Logger.error(this, "Got data invalid from "+next+" for 
"+this);
                    // FIXME: check validity ourself, propagate error if needed
                    continue;
                }

                if(msg.getSpec() != DMT.FNPPublishDataSucceeded) {
                    throw new IllegalStateException("Got unexpected message 
"+msg+" waiting for completion on "+this);
                }

                // Success
                return;
            }
        } catch (Throwable t) {
            Logger.error(this, "Caught "+t+" in "+this, t);
        }
    }

}

--- NEW FILE: ClientSubscription.java ---
package freenet.node;

import freenet.keys.ClientPublishStreamKey;

/**
 * A client subscription.
 */
public interface ClientSubscription {

    public ClientPublishStreamKey getKey();

    public SubscriptionCallback getCallback();

    public void disconnect();

}

--- NEW FILE: SubscriptionHandler.java ---
package freenet.node;

import java.util.LinkedList;

import freenet.keys.PublishStreamKey;
import freenet.support.Logger;
import freenet.support.NumberedItem;
import freenet.support.NumberedRecentItems;

/**
 * A single subscription.
 * May have a parent node, or may be the root.
 * May have many child nodes.
 */
public class SubscriptionHandler {

    static final int KEEP_PACKETS = 32;

    final PublishStreamKey key;
    ClientSubscriptionHandler localSubscribers;
    final LinkedList subscriberPeers;
    final NumberedRecentItems packets;

    public SubscriptionHandler(PublishStreamKey k) {
        key = k;
        subscriberPeers = new LinkedList();
        packets = new NumberedRecentItems(KEEP_PACKETS, true);
    }

    /**
     * Set the local subscribers handler.
     * @return The previous local subscribers handler - should be null!
     */
    public synchronized ClientSubscriptionHandler 
setLocal(ClientSubscriptionHandler csh) {
        ClientSubscriptionHandler h = localSubscribers;
        localSubscribers = csh;
        return h;
    }

    public synchronized boolean shouldDrop() {
        return (subscriberPeers.isEmpty() && localSubscribers == null);
    }

    /**
     * Process an incoming PublishData packet.
     */
    public void processPacket(long packetNumber, byte[] packetData, PeerNode 
source) {
        // First, have we seen it before?
        if(!packets.add(new PacketItem(packetNumber, packetData))) {
            Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+" 
twice");
            return;
        }
        // FIXME: redistribute it to node subscribers
        // Redistribute it to local subscribers
        localSubscribers.processPacket(packetNumber, packetData);
    }

    public class PacketItem implements NumberedItem {

        long packetNumber;
        byte[] data;

        public PacketItem(long packetNumber, byte[] packetData) {
            this.packetNumber = packetNumber;
            data = packetData;
        }

        public long getNumber() {
            return packetNumber;
        }

    }

}

Index: Node.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/Node.java,v
retrieving revision 1.47
retrieving revision 1.48
diff -u -w -r1.47 -r1.48
--- Node.java   7 Sep 2005 14:42:53 -0000       1.47
+++ Node.java   15 Sep 2005 18:16:04 -0000      1.48
@@ -22,7 +22,7 @@
 import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashSet;
+import java.util.Hashtable;

 import freenet.crypt.DiffieHellman;
 import freenet.crypt.RandomSource;
@@ -31,6 +31,7 @@
 import freenet.io.comm.DisconnectedException;
 import freenet.io.comm.Message;
 import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.Peer;
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.UdpSocketManager;
@@ -39,6 +40,7 @@
 import freenet.keys.CHKVerifyException;
 import freenet.keys.ClientCHK;
 import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientPublishStreamKey;
 import freenet.keys.NodeCHK;
 import freenet.store.BaseFreenetStore;
 import freenet.store.FreenetStore;
@@ -87,6 +89,11 @@
     private final HashMap transferringRequestSenders;
     /** InsertSender's currently running, by KeyHTLPair */
     private final HashMap insertSenders;
+    /** Subscriptions */
+    final SubscriptionManager subscriptions;
+    
+    /** Locally published stream contexts */
+    private final Hashtable localStreamContexts;

     private final HashSet runningUIDs;

@@ -276,6 +283,7 @@

         ps = new PacketSender(this);
         peers = new PeerManager(this, prefix+"peers-"+portNumber);
+        subscriptions = new SubscriptionManager(this);

         try {
             usm = new UdpSocketManager(portNumber);
@@ -289,6 +297,7 @@
         decrementAtMax = random.nextDouble() <= DECREMENT_AT_MAX_PROB;
         decrementAtMin = random.nextDouble() <= DECREMENT_AT_MIN_PROB;
         bootID = random.nextLong();
+        localStreamContexts = new Hashtable();
         peers.writePeers();
     }

@@ -688,4 +697,69 @@
         myName = key;
         writeNodeFile();
     }
+    
+    /** Create a publish stream and create context needed to insert data on 
it. */
+    public ClientPublishStreamKey createPublishStream() {
+        ClientPublishStreamKey key = 
+            ClientPublishStreamKey.createRandom(random);
+        makePublishContext(key);
+        return key;
+    }
+    
+    private PublishContext makePublishContext(ClientPublishStreamKey key) {
+        synchronized(localStreamContexts) {
+            PublishContext ctx = (PublishContext) localStreamContexts.get(key);
+            if(ctx == null) {
+                // Which it usually does
+                ctx = new PublishContext(key, this);
+                localStreamContexts.put(key, ctx);
+                return ctx;
+            } else {
+                Logger.error(this, "Reusing existing publish context for 
"+key);
+                // Reuse
+                return ctx;
+            }
+        }
+    }
+
+    public PublishHandlerSender makePublishHandlerSender(Message m) {
+        long id = m.getLong(DMT.UID);
+        Long lid = new Long(id);
+        boolean reject = false;
+        try {
+            synchronized(this) {
+                if(recentlyCompletedIDs.contains(lid))
+                    reject = true;
+                else if(runningUIDs.contains(lid))
+                    reject = true;
+                else runningUIDs.add(lid);
+            }
+            if(reject) {
+                try {
+                    Message msg = DMT.createFNPRejectedLoop(id);
+                    ((PeerNode)m.getSource()).sendAsync(msg, null);
+                } catch (NotConnectedException e) {
+                    Logger.normal(this, "Lost connection refusing PublishData: 
"+id);
+                }
+                return null;
+            } else {
+                return new PublishHandlerSender(m, this);
+            }
+        } catch (Throwable t) {
+            synchronized(this) {
+                runningUIDs.remove(lid);
+                recentlyCompletedIDs.remove(lid);
+            }
+            Logger.error(this, "Caught "+t+" handling PublishData "+id, t);
+            return null;
+        }
+    }
+    
+    public void publish(ClientPublishStreamKey key, byte[] data) {
+        makePublishContext(key).publish(data);
+    }
+
+    public ClientSubscription subscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb) {
+        return subscriptions.localSubscribe(key, cb);
+    }
 }

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/Version.java,v
retrieving revision 1.130
retrieving revision 1.131
diff -u -w -r1.130 -r1.131
--- Version.java        7 Sep 2005 18:40:56 -0000       1.130
+++ Version.java        15 Sep 2005 18:16:04 -0000      1.131
@@ -20,10 +20,10 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 135;
+       public static final int buildNumber = 136;

        /** Oldest build of Fred we will talk to */
-       public static final int lastGoodBuild = 131;
+       public static final int lastGoodBuild = 136;

        /** The highest reported build of fred */
        public static int highestSeenBuild = buildNumber;

Index: NodeDispatcher.java
===================================================================
RCS file: 
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/NodeDispatcher.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -w -r1.22 -r1.23
--- NodeDispatcher.java 7 Sep 2005 15:22:16 -0000       1.22
+++ NodeDispatcher.java 15 Sep 2005 18:16:04 -0000      1.23
@@ -6,6 +6,7 @@
 import freenet.io.comm.DMT;
 import freenet.io.comm.Dispatcher;
 import freenet.io.comm.Message;
+import freenet.io.comm.MessageType;
 import freenet.io.comm.NotConnectedException;
 import freenet.support.Logger;

@@ -48,45 +49,52 @@
             }
             return true;
         }
-        if(m.getSpec() == DMT.FNPLocChangeNotification) {
+        MessageType spec = m.getSpec();
+        if(spec == DMT.FNPLocChangeNotification) {
             double newLoc = m.getDouble(DMT.LOCATION);
             source.updateLocation(newLoc);
             return true;
-        }
-        if(m.getSpec() == DMT.FNPSwapRequest) {
+        } else if(spec == DMT.FNPSwapRequest) {
             return node.lm.handleSwapRequest(m);
-        }
-        if(m.getSpec() == DMT.FNPSwapReply) {
+        } else if(spec == DMT.FNPSwapReply) {
             return node.lm.handleSwapReply(m);
-        }
-        if(m.getSpec() == DMT.FNPSwapRejected) {
+        } else if(spec == DMT.FNPSwapRejected) {
             return node.lm.handleSwapRejected(m);
-        }
-        if(m.getSpec() == DMT.FNPSwapCommit) {
+        } else if(spec == DMT.FNPSwapCommit) {
             return node.lm.handleSwapCommit(m);
-        }
-        if(m.getSpec() == DMT.FNPSwapComplete) {
+        } else if(spec == DMT.FNPSwapComplete) {
             return node.lm.handleSwapComplete(m);
-        }
-        if(m.getSpec() == DMT.FNPRoutedPing) {
+        } else if(spec == DMT.FNPRoutedPing) {
             return handleRouted(m);
-        }
-        if(m.getSpec() == DMT.FNPRoutedPong) {
+        } else if(spec == DMT.FNPRoutedPong) {
             return handleRoutedReply(m);
-        }
-        if(m.getSpec() == DMT.FNPRoutedRejected) {
+        } else if(spec == DMT.FNPRoutedRejected) {
             return handleRoutedRejected(m);
-        }
-        if(m.getSpec() == DMT.FNPDataRequest) {
+        } else if(spec == DMT.FNPDataRequest) {
             return handleDataRequest(m);
-        }
-        if(m.getSpec() == DMT.FNPInsertRequest) {
+        } else if(spec == DMT.FNPInsertRequest) {
             return handleInsertRequest(m);
+        } else if(spec == DMT.FNPPublishData) {
+            return handlePublishData(m);
         }
+//        } // SubscribeData, SubscribeRestarted etc handled by 
SubscribeSender.
         return false;
     }

     /**
+     * Handle an FNPPublishData message.
+     * @return False to put it back onto the queue.
+     */
+    private boolean handlePublishData(Message m) {
+        // Create a PublishSender. This will do all the work.
+        // FIXME maybe we should check whether we've sent the packet before?
+        // It's not really a viable DoS but it is good practice...
+        PublishHandlerSender ps =
+            node.makePublishHandlerSender(m);
+        return true;
+    }
+
+    /**
      * Handle an incoming FNPDataRequest.
      */
     private boolean handleDataRequest(Message m) {

Index: SimpleClient.java
===================================================================
RCS file: 
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/SimpleClient.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -w -r1.1 -r1.2
--- SimpleClient.java   16 Jul 2005 12:00:34 -0000      1.1
+++ SimpleClient.java   15 Sep 2005 18:16:04 -0000      1.2
@@ -2,6 +2,7 @@

 import freenet.keys.ClientCHK;
 import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientPublishStreamKey;

 /**
  * @author amphibian
@@ -13,8 +14,32 @@
  */
 public interface SimpleClient {

+    /**
+     * Fetch a key. Return null if cannot retrieve it.
+     */
     public ClientCHKBlock getCHK(ClientCHK key);

+    /**
+     * Insert a key.
+     */
     public void putCHK(ClientCHKBlock key);

+    /**
+     * Create a publish/subscribe stream key. Create any context
+     * needed to insert it.
+     */
+    public ClientPublishStreamKey createPublishStream();
+    
+    /**
+     * Publish a block of data to a publish/subscribe key.
+     */
+    public void publish(ClientPublishStreamKey key, byte[] data);
+
+    /**
+     * Subscribe to a stream.
+     * @param key The stream key.
+     * @param cb Callback to notify when we get data.
+     * @return True if the subscribe succeeded.
+     */
+    public ClientSubscription subscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb);
 }

Index: TextModeClientInterface.java
===================================================================
RCS file: 
/cvsroot/freenet/Freenet0.7Rewrite/src/freenet/node/TextModeClientInterface.java,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -w -r1.22 -r1.23
--- TextModeClientInterface.java        7 Sep 2005 18:40:56 -0000       1.22
+++ TextModeClientInterface.java        15 Sep 2005 18:16:04 -0000      1.23
@@ -9,7 +9,9 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
 import java.net.MalformedURLException;
+import java.util.Hashtable;

 import freenet.crypt.RandomSource;
 import freenet.io.comm.PeerParseException;
@@ -18,6 +20,7 @@
 import freenet.keys.CHKEncodeException;
 import freenet.keys.ClientCHK;
 import freenet.keys.ClientCHKBlock;
+import freenet.keys.ClientPublishStreamKey;
 import freenet.keys.FreenetURI;
 import freenet.support.Logger;
 import freenet.support.SimpleFieldSet;
@@ -31,12 +34,14 @@
  */
 public class TextModeClientInterface implements Runnable {

-    RandomSource r;
-    Node n;
+    final RandomSource r;
+    final Node n;
+    final Hashtable streams;

     TextModeClientInterface(Node n) {
         this.n = n;
         this.r = n.random;
+        streams = new Hashtable();
         new Thread(this).start();
     }

@@ -54,6 +59,9 @@
         System.out.println("PUT:<text> - put a single line of text to a CHK 
and return the key.");
         System.out.println("PUTFILE:<filename> - put a file from disk.");
         System.out.println("GETFILE:<filename> - fetch a key and put it in a 
file. If the key includes a filename we will use it but we will not overwrite 
local files.");
+        System.out.println("PUBLISH:<name> - create a publish/subscribe stream 
called <name>");
+        System.out.println("PUSH:<name>:<text> - publish a single line of text 
to the stream named");
+        System.out.println("SUBSCRIBE:<key> - subscribe to a publish/subscribe 
stream by key");
         System.out.println("CONNECT:<filename> - connect to a node from its 
ref in a file.");
         System.out.println("CONNECT:\n<noderef including an End on a line by 
itself> - enter a noderef directly.");
         System.out.println("NAME:<new node name> - change the node's name.");
@@ -75,7 +83,7 @@
     /**
      * 
      */
-    private void processLine(BufferedReader reader) {
+    private void processLine(BufferedReader reader) throws 
UnsupportedEncodingException {
         String line;
         try {
             line = reader.readLine();
@@ -84,7 +92,7 @@
             return;
         }
         if(line == null) line = "QUIT";
-        String uline = line.toLowerCase();
+        String uline = line.toUpperCase();
         if(uline.startsWith("GET:")) {
             // Should have a key next
             String key = line.substring("GET:".length());
@@ -252,6 +260,53 @@
                 System.out.println("Threw: "+t);
                 t.printStackTrace();
             }
+        } else if(line.startsWith("PUBLISH:")) {
+            line = line.substring("PUBLISH:".length());
+            line = line.trim();
+            System.out.println("Stream name: "+line);
+            ClientPublishStreamKey key = n.createPublishStream();
+            FreenetURI streamKey = key.getURI();
+            streamKey = streamKey.setDocName(line);
+            System.out.println("Stream key: "+streamKey);
+            streams.put(line, key);
+        } else if(line.startsWith("PUSH:")) {
+            // PUSH:<name>:<text>
+            line = line.substring("PUSH:".length());
+            line = line.trim();
+            int index = line.indexOf(':');
+            if(index == -1) {
+                System.err.println("What do you want me to publish?");
+                return;
+            }
+            String name = line.substring(0, index);
+            ClientPublishStreamKey key = (ClientPublishStreamKey) 
streams.get(name);
+            if(key == null) {
+                System.err.println("Could not find stream called "+name);
+            } else {
+                String content;
+                if(line.length() > index+1) {
+                    content = line.substring(index+1);
+                } else {
+                    content = readLines(reader, false);
+                }
+                System.out.println("Publishing to "+key);
+                System.out.println("Data to publish:\n"+content);
+                n.publish(key, content.getBytes("UTF-8"));
+            }
+        } else if(uline.startsWith("SUBSCRIBE:")) {
+            line = line.substring("SUBSCRIBE:".length());
+            line = line.trim();
+            try {
+                FreenetURI uri = new FreenetURI(line);
+                ClientPublishStreamKey key = new ClientPublishStreamKey(uri);
+                n.subscribe(key, new MySubscriptionCallback(uri.getDocName()));
+                System.out.println("Subscribed.");
+            } catch (MalformedURLException e1) {
+                System.err.println("Invalid URI: "+e1.getMessage());
+                e1.printStackTrace();
+                return;
+            }
+            
         } else if(uline.startsWith("STATUS")) {
             SimpleFieldSet fs = n.exportFieldSet();
             System.out.println(fs.toString());
@@ -408,6 +463,33 @@
         return sb.toString();
     }

+    /**
+     * 
+     * SubscriptionCallback that dumps output to stdout.
+     */
+    public class MySubscriptionCallback implements SubscriptionCallback {
+
+        final String name;
+        
+        public MySubscriptionCallback(String string) {
+            name = string;
+        }
+
+        public void got(long packetNumber, byte[] data) {
+            System.out.println(name+":"+packetNumber+":"+new String(data));
+        }
+
+        public void lostConnection() {
+            System.out.println(name+":LOST CONNECTION");
+        }

+        public void restarted() {
+            System.out.println(name+":RESTARTED");
+        }
+
+        public void connected() {
+            System.out.println(name+":CONNECTED");
+        }
+    }

 }


Reply via email to