Author: toad
Date: 2005-10-21 13:34:42 +0000 (Fri, 21 Oct 2005)
New Revision: 7444

Removed:
   trunk/freenet/src/freenet/node/ClientSubscription.java
   trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java
   trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java
   trunk/freenet/src/freenet/node/PublishContext.java
   trunk/freenet/src/freenet/node/PublishHandlerSender.java
   trunk/freenet/src/freenet/node/SubscriptionCallback.java
   trunk/freenet/src/freenet/node/SubscriptionHandler.java
   trunk/freenet/src/freenet/node/SubscriptionManager.java
Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/SimpleClient.java
   trunk/freenet/src/freenet/node/TextModeClientInterface.java
Log:
Remove publish/subscribe from trunk.

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2005-10-21 13:25:48 UTC (rev 
7443)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2005-10-21 13:34:42 UTC (rev 
7444)
@@ -776,76 +776,6 @@
         return msg;
     }
     
-    public static final MessageType FNPPublishData = new 
MessageType("FNPPublishData") {{
-        addField(UID, Long.class);
-        addField(HTL, Short.class);
-        addField(DATA, ShortBuffer.class);
-        addField(KEY, PublishStreamKey.class);
-        addField(STREAM_SEQNO, Long.class);
-        addField(NEAREST_LOCATION, Double.class);
-    }};
-    
-    public static final Message createFNPPublishData(short htl, byte[] data, 
PublishStreamKey key, long seqNo, long id, double nearestLoc) {
-        Message msg = new Message(FNPPublishData);
-        msg.set(HTL, htl);
-        msg.set(KEY, key);
-        msg.set(DATA, new ShortBuffer(data));
-        msg.set(STREAM_SEQNO, seqNo);
-        msg.set(UID, id);
-        msg.set(NEAREST_LOCATION, nearestLoc);
-        return msg;
-    }
-    
-    public static final MessageType FNPSubscribeData = new 
MessageType("FNPSubscribeData") {{
-        addField(DATA, ShortBuffer.class);
-        addField(KEY, PublishStreamKey.class);
-        addField(STREAM_SEQNO, Long.class);
-    }};
-    
-    public static final Message createFNPSubscribeData(PublishStreamKey key, 
long seqNo, byte[] data) {
-        Message msg = new Message(FNPSubscribeData);
-        msg.set(KEY, key);
-        msg.set(STREAM_SEQNO, seqNo);
-        msg.set(DATA, new ShortBuffer(data));
-        return msg;
-    }
-    
-    public static final MessageType FNPPublishDataSucceeded = new 
MessageType("FNPPublishDataSucceeded") {{
-        addField(UID, Long.class);
-    }};
-    
-    public static final Message createFNPPublishDataSucceeded(long id) {
-        Message msg = new Message(FNPPublishDataSucceeded);
-        msg.set(UID, id);
-        return msg;
-    }
-    
-    public static final MessageType FNPPublishDataInvalid = new 
MessageType("FNPPublishDataInvalid") {{
-        addField(UID, Long.class);
-    }};
-    
-    public static final Message createFNPPublishDataInvalid(long id) {
-        Message msg = new Message(FNPPublishDataInvalid);
-        msg.set(UID, id);
-        return msg;
-    }
-    
-//    public static final MessageType FNPSubscribeRequest = new 
MessageType("FNPSubscribeRequest") {{
-//        addField(UID, Long.class);
-//        addField(HTL, Short.class);
-//        addField(KEY, PublishStreamKey.class);
-//        addField(STREAM_SEQNO, Long.class);
-//    }};
-//    
-//    public static final Message createFNPSubscribeRequest(long uid, short 
htl, PublishStreamKey key, long seqNo) {
-//        Message msg = new Message(FNPSubscribeRequest);
-//        msg.set(UID, uid);
-//        msg.set(HTL, htl);
-//        msg.set(KEY, key);
-//        msg.set(STREAM_SEQNO, seqNo);
-//        return msg;
-//    }
-//    
        public static void init() { }
 
 }

Deleted: trunk/freenet/src/freenet/node/ClientSubscription.java
===================================================================
--- trunk/freenet/src/freenet/node/ClientSubscription.java      2005-10-21 
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/ClientSubscription.java      2005-10-21 
13:34:42 UTC (rev 7444)
@@ -1,16 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientPublishStreamKey;
-
-/**
- * A client subscription.
- */
-public interface ClientSubscription {
-
-    public ClientPublishStreamKey getKey();
-    
-    public SubscriptionCallback getCallback();
-    
-    public void disconnect();
-    
-}

Deleted: trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java       
2005-10-21 13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java       
2005-10-21 13:34:42 UTC (rev 7444)
@@ -1,59 +0,0 @@
-package freenet.node;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import freenet.keys.ClientPublishStreamKey;
-import freenet.support.Logger;
-
-/**
- * Contains a list of ClientSubscriptionImpl's.
- * When they all disconnect, we unregister.
- */
-public class ClientSubscriptionHandler {
-    
-    
-    final ClientPublishStreamKey key;
-    final SubscriptionManager sm;
-    boolean finished = false;
-    private final LinkedList clientSubs;
-    
-    ClientSubscriptionHandler(SubscriptionManager manager, 
ClientPublishStreamKey key) {
-        clientSubs = new LinkedList();
-        sm = manager;
-        this.key = key;
-    }
-    
-    synchronized void add(ClientSubscriptionImpl sub) {
-        if(finished)
-            throw new IllegalArgumentException();
-        clientSubs.add(sub);
-    }
-    
-    synchronized void remove(ClientSubscriptionImpl sub) {
-        clientSubs.remove(sub);
-        if(clientSubs.size() == 0) {
-            finished = true;
-            sm.remove(this);
-        }
-    }
-
-    public ClientPublishStreamKey getKey() {
-        return key;
-    }
-
-    /**
-     * Received data!
-     */
-    public void processPacket(long packetNumber, byte[] packetData) {
-        byte[] finalData = key.decrypt(packetNumber, packetData);
-        if(finalData == null) {
-            Logger.error(this, "Packet did not decrypt");
-            return;
-        }
-        for(Iterator i=clientSubs.iterator();i.hasNext();) {
-            ClientSubscriptionImpl impl = (ClientSubscriptionImpl)i.next();
-            impl.processPacket(packetNumber, finalData);
-        }
-    }
-}

Deleted: trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java
===================================================================
--- trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java  2005-10-21 
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java  2005-10-21 
13:34:42 UTC (rev 7444)
@@ -1,49 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientPublishStreamKey;
-import freenet.support.Logger;
-
-public class ClientSubscriptionImpl implements ClientSubscription {
-
-    private ClientPublishStreamKey key;
-    private SubscriptionCallback cb;
-    private ClientSubscriptionHandler ch;
-    private boolean finished = false;
-    
-    /**
-     * @param key2
-     * @param cb2
-     * @param csh
-     */
-    public ClientSubscriptionImpl(ClientPublishStreamKey key2, 
SubscriptionCallback cb2, ClientSubscriptionHandler csh) {
-        cb = cb2;
-        key = key2;
-        ch = csh;
-    }
-
-    public ClientPublishStreamKey getKey() {
-        return key;
-    }
-
-    public SubscriptionCallback getCallback() {
-        return cb;
-    }
-
-    public void disconnect() {
-        finished = true;
-        ch.remove(this);
-    }
-
-    /**
-     * Received decrypted data from the stream.
-     */
-    public void processPacket(long packetNumber, byte[] finalData) {
-        try {
-            cb.got(packetNumber, finalData);
-        } catch (Throwable t) {
-            Logger.error(this, "Caught "+t+" from callback in processPacket("+
-                    packetNumber+") on "+this, t);
-        }
-    }
-
-}

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2005-10-21 13:25:48 UTC (rev 
7443)
+++ trunk/freenet/src/freenet/node/Node.java    2005-10-21 13:34:42 UTC (rev 
7444)
@@ -89,8 +89,6 @@
     private final HashMap transferringRequestSenders;
     /** InsertSender's currently running, by KeyHTLPair */
     private final HashMap insertSenders;
-    /** Subscriptions */
-    final SubscriptionManager subscriptions;
     
     /** Locally published stream contexts */
     private final Hashtable localStreamContexts;
@@ -283,7 +281,6 @@
         
         ps = new PacketSender(this);
         peers = new PeerManager(this, prefix+"peers-"+portNumber);
-        subscriptions = new SubscriptionManager(this);
         
         try {
             usm = new UdpSocketManager(portNumber);
@@ -697,69 +694,4 @@
         myName = key;
         writeNodeFile();
     }
-    
-    /** Create a publish stream and create context needed to insert data on 
it. */
-    public ClientPublishStreamKey createPublishStream() {
-        ClientPublishStreamKey key = 
-            ClientPublishStreamKey.createRandom(random);
-        makePublishContext(key);
-        return key;
-    }
-    
-    private PublishContext makePublishContext(ClientPublishStreamKey key) {
-        synchronized(localStreamContexts) {
-            PublishContext ctx = (PublishContext) localStreamContexts.get(key);
-            if(ctx == null) {
-                // Which it usually does
-                ctx = new PublishContext(key, this);
-                localStreamContexts.put(key, ctx);
-                return ctx;
-            } else {
-                Logger.error(this, "Reusing existing publish context for 
"+key);
-                // Reuse
-                return ctx;
-            }
-        }
-    }
-
-    public PublishHandlerSender makePublishHandlerSender(Message m) {
-        long id = m.getLong(DMT.UID);
-        Long lid = new Long(id);
-        boolean reject = false;
-        try {
-            synchronized(this) {
-                if(recentlyCompletedIDs.contains(lid))
-                    reject = true;
-                else if(runningUIDs.contains(lid))
-                    reject = true;
-                else runningUIDs.add(lid);
-            }
-            if(reject) {
-                try {
-                    Message msg = DMT.createFNPRejectedLoop(id);
-                    ((PeerNode)m.getSource()).sendAsync(msg, null);
-                } catch (NotConnectedException e) {
-                    Logger.normal(this, "Lost connection refusing PublishData: 
"+id);
-                }
-                return null;
-            } else {
-                return new PublishHandlerSender(m, this);
-            }
-        } catch (Throwable t) {
-            synchronized(this) {
-                runningUIDs.remove(lid);
-                recentlyCompletedIDs.remove(lid);
-            }
-            Logger.error(this, "Caught "+t+" handling PublishData "+id, t);
-            return null;
-        }
-    }
-    
-    public void publish(ClientPublishStreamKey key, byte[] data) {
-        makePublishContext(key).publish(data);
-    }
-
-    public ClientSubscription subscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb) {
-        return subscriptions.localSubscribe(key, cb);
-    }
 }

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java  2005-10-21 13:25:48 UTC 
(rev 7443)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java  2005-10-21 13:34:42 UTC 
(rev 7444)
@@ -74,27 +74,11 @@
             return handleDataRequest(m);
         } else if(spec == DMT.FNPInsertRequest) {
             return handleInsertRequest(m);
-        } else if(spec == DMT.FNPPublishData) {
-            return handlePublishData(m);
         }
-//        } // SubscribeData, SubscribeRestarted etc handled by 
SubscribeSender.
         return false;
     }
 
     /**
-     * Handle an FNPPublishData message.
-     * @return False to put it back onto the queue.
-     */
-    private boolean handlePublishData(Message m) {
-        // Create a PublishSender. This will do all the work.
-        // FIXME maybe we should check whether we've sent the packet before?
-        // It's not really a viable DoS but it is good practice...
-        PublishHandlerSender ps =
-            node.makePublishHandlerSender(m);
-        return true;
-    }
-
-    /**
      * Handle an incoming FNPDataRequest.
      */
     private boolean handleDataRequest(Message m) {

Deleted: trunk/freenet/src/freenet/node/PublishContext.java
===================================================================
--- trunk/freenet/src/freenet/node/PublishContext.java  2005-10-21 13:25:48 UTC 
(rev 7443)
+++ trunk/freenet/src/freenet/node/PublishContext.java  2005-10-21 13:34:42 UTC 
(rev 7444)
@@ -1,41 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientPublishStreamKey;
-
-/**
- * Context for a locally originated publish stream.
- */
-public class PublishContext {
-    
-    final ClientPublishStreamKey key;
-    final Node node;
-    /** Last packet number. Incremented each time, modulo 2^63-1 */
-    private long lastPacketNumber;
-    
-    /**
-     * @param key
-     */
-    public PublishContext(ClientPublishStreamKey key, Node node) {
-        this.key = key;
-        this.node = node;
-        lastPacketNumber = Math.abs(node.random.nextLong());
-    }
-
-    /**
-     * Publish a block of data to the stream. Must not exceed
-     * size limit.
-     */
-    public void publish(byte[] data) {
-        long packetNumber;
-        synchronized(this) {
-            packetNumber = lastPacketNumber;
-            long next = lastPacketNumber+1;
-            if(next < 0) next = 0;
-            lastPacketNumber = next;
-        }
-        byte[] encrypted = 
-            key.encrypt(packetNumber, data, node.random);
-        PublishHandlerSender ps = new PublishHandlerSender(key.getKey(), 
node.random.nextLong(), packetNumber, encrypted, node, null, -1.0);
-    }
-
-}

Deleted: trunk/freenet/src/freenet/node/PublishHandlerSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PublishHandlerSender.java    2005-10-21 
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/PublishHandlerSender.java    2005-10-21 
13:34:42 UTC (rev 7444)
@@ -1,287 +0,0 @@
-package freenet.node;
-
-import java.util.HashSet;
-
-import freenet.io.comm.DMT;
-import freenet.io.comm.DisconnectedException;
-import freenet.io.comm.Message;
-import freenet.io.comm.MessageFilter;
-import freenet.io.comm.NotConnectedException;
-import freenet.keys.PublishStreamKey;
-import freenet.support.Fields;
-import freenet.support.Logger;
-import freenet.support.ShortBuffer;
-
-/**
- * Forwards a PublishData message. Routed, exactly as an insert.
- * Also communicates with the previous hop. For inserts and requests
- * we split this into two - Handler and Sender - but for this it is
- * simpler to have one, especially as there is no coalescing.
- */
-public class PublishHandlerSender implements Runnable {
-
-    // Basics
-    final PublishStreamKey key;
-    final double target;
-    final long uid;
-    private short htl;
-    final PeerNode source;
-    final Node node;
-    final long packetNumber;
-    final byte[] packetData;
-    final Thread t;
-    private double closestLocation;
-    
-    // Constants - FIXME way too high?
-    static final int ACCEPTED_TIMEOUT = 10000;
-    static final int COMPLETE_TIMEOUT = 120000;
-
-    public String toString() {
-        return super.toString()+": id="+uid+", source="+source+", 
packetNo="+packetNumber+", data="+Fields.hashCode(packetData);
-    }
-    
-    
-    /**
-     * Create a PublishHandlerSender.
-     * closestLocation must be reset by the caller if we are closer to target
-     * than the node sending the request.
-     */
-    public PublishHandlerSender(PublishStreamKey key2, long id, long packetNo, 
byte[] data, Node n, PeerNode src, double closestLoc) {
-        node = n;
-        key = key2;
-        target = key.toNormalizedDouble();
-        uid = id;
-        packetNumber = packetNo;
-        packetData = data;
-        source = src;
-        closestLocation = closestLoc;
-        htl = Node.MAX_HTL;
-        t = new Thread(this);
-        t.setDaemon(true);
-        t.start();
-    }
-    
-    /**
-     * Create from an incoming Message. Does not check
-     * UID/key for collision; this must be done by caller.
-     * Will do everything else though, including resetting
-     * the closestLocation if necessary.
-     */
-    public PublishHandlerSender(Message m, Node n) {
-        this.node = n;
-        key = (PublishStreamKey) m.getObject(DMT.KEY);
-        target = key.toNormalizedDouble();
-        uid = m.getLong(DMT.UID);
-        packetNumber = m.getLong(DMT.STREAM_SEQNO);
-        source = (PeerNode) m.getSource();
-        packetData = ((ShortBuffer)m.getObject(DMT.DATA)).getData();
-        closestLocation = m.getDouble(DMT.NEAREST_LOCATION);
-        htl = m.getShort(DMT.HTL);
-        double myLoc = n.lm.getLocation().getValue();
-        if(Math.abs(myLoc - target) < Math.abs(closestLocation - target))
-            closestLocation = myLoc;
-        t = new Thread(this);
-        t.setDaemon(true);
-        t.start();
-    }
-
-    /** Very similar to InsertSender.run() */
-    public void run() {
-        Logger.minor(this, "Running "+this);
-        try {
-            // Copy it as it will be decrypted in place
-            byte[] packetDataCopy = new byte[packetData.length];
-            System.arraycopy(packetData, 0, packetDataCopy, 0, 
packetData.length);
-            node.subscriptions.receivedPacket(key, packetNumber, 
packetDataCopy, source);
-            short origHTL = htl;
-            if(source != null) {
-                Message msg = DMT.createFNPAccepted(uid);
-                try {
-                    source.sendAsync(msg, null);
-                } catch (NotConnectedException e) {
-                    Logger.normal(this, "Not connected sending Accepted on 
"+this);
-                }
-            }
-            
-            HashSet nodesRoutedTo = new HashSet();
-            
-            // Don't check whether source is connected; forward anyway
-            
-            while(true) {
-                if(htl == 0) {
-                    // Ran out of hops; success
-                    if(source != null) {
-                        Message msg = DMT.createFNPPublishDataSucceeded(uid);
-                        try {
-                            source.sendAsync(msg, null);
-                        } catch (NotConnectedException e) {
-                            Logger.minor(this, "Not connected sending 
FNPPublishDataSucceeded on "+this);
-                        }
-                    }
-                    return;
-                }
-                
-                // Route
-                PeerNode next;
-                
-                // Can backtrack, so only route to nodes closer than we are to 
target.
-                double nextValue;
-                synchronized(node.peers) {
-                    next = node.peers.closerPeer(source, nodesRoutedTo, 
target, true);
-                    if(next != null)
-                        nextValue = next.getLocation().getValue();
-                    else
-                        nextValue = -1.0;
-                }
-                
-                if(next == null) {
-                    // Backtrack
-                    Logger.minor(this, "No more routes");
-                    if(source != null) {
-                        Message msg = DMT.createFNPRouteNotFound(uid, htl);
-                        try {
-                            source.sendAsync(msg, null);
-                        } catch (NotConnectedException e) {
-                            Logger.normal(this, "Not connected when RNFing on 
"+this);
-                        }
-                    } else {
-                        Logger.minor(this, "RNF on "+this);
-                    }
-                    return;
-                }
-                Logger.minor(this, "Routing insert to "+next);
-                nodesRoutedTo.add(next);
-                
-                if(Math.abs(target - nextValue) > Math.abs(target - 
closestLocation)) {
-                    Logger.minor(this, "Backtracking: target="+target+" 
next="+nextValue+" closest="+closestLocation);
-                    htl = node.decrementHTL(source, htl);
-                }
-                
-                // Now we send it
-                /**
-                 * Possible responses:
-                 * Accepted
-                 * RejectedOverload
-                 * RejectedLoop
-                 * (timeout)
-                 */
-                Message msg = DMT.createFNPPublishData(htl, packetData, key, 
packetNumber, uid, closestLocation);
-                
-                MessageFilter mfAccepted = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
-                MessageFilter mfRejectedLoop = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
-                MessageFilter mfRejectedOverload = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
-                MessageFilter mfPublishDataInvalid = 
MessageFilter.create().setSource(next).setField(DMT.UID, 
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPPublishDataInvalid);
-                
-                // mfRejectedOverload must be the last thing in the or
-                // So its or pointer remains null
-                // Otherwise we need to recreate it below
-                MessageFilter mf = 
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload.or(mfPublishDataInvalid)));
-                mfRejectedOverload.clearOr();
-                
-                try {
-                    next.send(msg);
-                } catch (NotConnectedException e) {
-                    Logger.minor(this, "Not connected to "+next+" - skipping");
-                    continue;
-                }
-                
-                try {
-                    msg = null;
-                    msg = node.usm.waitFor(mf);
-                } catch (DisconnectedException e1) {
-                    Logger.minor(this, "Not connected to "+next+" while 
waiting, skipping");
-                    continue;
-                }
-                
-                if(msg == null) {
-                    // Timeout waiting for Accepted
-                    Logger.normal(this, "Timed out waiting for Accepted on 
"+this+" from "+next);
-                    continue;
-                }
-                
-                if(msg.getSpec() == DMT.FNPRejectedLoop) {
-                    Logger.minor(this, "Rejected: loop for "+this);
-                    continue;
-                }
-                
-                if(msg.getSpec() == DMT.FNPRejectedOverload) {
-                    // Propagate back to source; fatal
-                    if(source != null) {
-                        Message m = DMT.createFNPRejectedOverload(uid);
-                        try {
-                            source.sendAsync(m,null);
-                        } catch (NotConnectedException e2) {
-                            Logger.normal(this, "Source disconnected relaying 
rejected:overload from "+next+" for "+this);
-                        }
-                    } else {
-                        Logger.normal(this, "FNPDataPublish rejected: overload 
from "+next+" for "+this);
-                    }
-                    return;
-                }
-
-                if(msg.getSpec() != DMT.FNPAccepted) {
-                    throw new IllegalStateException("Unrecognized message: 
"+msg+" while waiting for Accepted on "+this);
-                }
-                
-                // Got an Accepted; wait for a success
-                
-                MessageFilter mfSucceeded = 
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
 uid).setType(DMT.FNPPublishDataSucceeded);
-                MessageFilter mfRNF = 
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
 uid).setType(DMT.FNPRouteNotFound);
-                mfRejectedOverload.clearOr();
-                mfPublishDataInvalid.clearOr();
-                
-                mf = 
mfSucceeded.or(mfRNF.or(mfRejectedOverload.or(mfPublishDataInvalid)));
-                
-                msg = null;
-                try {
-                    msg = node.usm.waitFor(mf);
-                } catch (DisconnectedException e3) {
-                    Logger.error(this, "Disconnected from next: "+next+" while 
waiting for completion on "+this);
-                    continue; // can cause load multiplication, hence the 
error; what else are we supposed to do though?
-                }
-                
-                if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
-                    // Timeout
-                    Logger.error(this, "Timeout waiting for completion of 
"+this+" : "+msg);
-                    // Propagate back to source; fatal
-                    if(source != null) {
-                        Message m = DMT.createFNPRejectedOverload(uid);
-                        try {
-                            source.sendAsync(m,null);
-                        } catch (NotConnectedException e2) {
-                            Logger.normal(this, "Source disconnected relaying 
rejected:overload from "+next+" for "+this);
-                        }
-                    } else {
-                        Logger.normal(this, "FNPDataPublish rejected: overload 
from "+next+" for "+this);
-                    }
-                    return;
-                }
-                
-                if(msg.getSpec() == DMT.FNPRouteNotFound) {
-                    Logger.minor(this, "Rejected: RNF");
-                    // Still gets the data - but not yet
-                    short newHtl = msg.getShort(DMT.HTL);
-                    if(htl > newHtl) htl = newHtl;
-                    continue;
-
-                }
-                
-                if(msg.getSpec() == DMT.FNPPublishDataInvalid) {
-                    Logger.error(this, "Got data invalid from "+next+" for 
"+this);
-                    // FIXME: check validity ourself, propagate error if needed
-                    continue;
-                }
-                
-                if(msg.getSpec() != DMT.FNPPublishDataSucceeded) {
-                    throw new IllegalStateException("Got unexpected message 
"+msg+" waiting for completion on "+this);
-                }
-                
-                // Success
-                return;
-            }
-        } catch (Throwable t) {
-            Logger.error(this, "Caught "+t+" in "+this, t);
-        }
-    }
-
-}

Modified: trunk/freenet/src/freenet/node/SimpleClient.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleClient.java    2005-10-21 13:25:48 UTC 
(rev 7443)
+++ trunk/freenet/src/freenet/node/SimpleClient.java    2005-10-21 13:34:42 UTC 
(rev 7444)
@@ -23,23 +23,4 @@
      * Insert a key.
      */
     public void putCHK(ClientCHKBlock key);
-
-    /**
-     * Create a publish/subscribe stream key. Create any context
-     * needed to insert it.
-     */
-    public ClientPublishStreamKey createPublishStream();
-    
-    /**
-     * Publish a block of data to a publish/subscribe key.
-     */
-    public void publish(ClientPublishStreamKey key, byte[] data);
-
-    /**
-     * Subscribe to a stream.
-     * @param key The stream key.
-     * @param cb Callback to notify when we get data.
-     * @return True if the subscribe succeeded.
-     */
-    public ClientSubscription subscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb);
 }

Deleted: trunk/freenet/src/freenet/node/SubscriptionCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SubscriptionCallback.java    2005-10-21 
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/SubscriptionCallback.java    2005-10-21 
13:34:42 UTC (rev 7444)
@@ -1,16 +0,0 @@
-package freenet.node;
-
-/**
- * Publish/Subscribe subscriber callback.
- */
-public interface SubscriptionCallback {
-
-    void got(long packetNumber, byte[] data);
-    
-    void lostConnection();
-    
-    void restarted();
-    
-    void connected();
-    
-}

Deleted: trunk/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SubscriptionHandler.java     2005-10-21 
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/SubscriptionHandler.java     2005-10-21 
13:34:42 UTC (rev 7444)
@@ -1,121 +0,0 @@
-package freenet.node;
-
-import freenet.keys.PublishStreamKey;
-import freenet.support.Logger;
-import freenet.support.NumberedItem;
-import freenet.support.NumberedRecentItems;
-
-/**
- * A single subscription.
- * May have a parent node, or may be the root.
- * May have many child nodes.
- */
-public class SubscriptionHandler {
-    
-    static final int KEEP_PACKETS = 32;
-    
-    final PublishStreamKey key;
-    ClientSubscriptionHandler localSubscribers;
-    PeerNode[] subscriberPeers;
-    final NumberedRecentItems packets;
-
-    public SubscriptionHandler(PublishStreamKey k) {
-        key = k;
-        subscriberPeers = null;
-        packets = new NumberedRecentItems(KEEP_PACKETS, true);
-    }
-
-    /**
-     * Set the local subscribers handler.
-     * @return The previous local subscribers handler - should be null!
-     */
-    public synchronized ClientSubscriptionHandler 
setLocal(ClientSubscriptionHandler csh) {
-        ClientSubscriptionHandler h = localSubscribers;
-        localSubscribers = csh;
-        return h;
-    }
-
-    public synchronized boolean shouldDrop() {
-        return subscriberPeersEmpty() && localSubscribers == null;
-    }
-
-    /**
-     * @return True if there are no subscriber peers.
-     */
-    private final boolean subscriberPeersEmpty() {
-        PeerNode[] peers = subscriberPeers;
-        return (peers == null || peers.length == 0);
-    }
-
-    public void addSubscriberNode(PeerNode pn, long lastSeen) {
-        synchronized(this) {
-            if(subscriberPeers == null || subscriberPeers.length == 0) {
-                subscriberPeers = new PeerNode[] { pn };
-            } else {
-                PeerNode[] peers = new PeerNode[subscriberPeers.length+1];
-                System.arraycopy(subscriberPeers, 0, peers, 0, 
subscriberPeers.length);
-                peers[peers.length-1] = pn;
-                subscriberPeers = peers;
-            }
-        }
-        NumberedItem[] items = packets.getAfter(lastSeen);
-        if(items == null || items.length == 0) return;
-        for(int i=0;i<items.length;i++) {
-            PacketItem item = (PacketItem)items[i];
-            item.forwardTo(pn);
-        }
-    }
-    
-    /**
-     * Process an incoming PublishData packet.
-     */
-    public void processPacket(long packetNumber, byte[] packetData, PeerNode 
source) {
-        // First, have we seen it before?
-        PacketItem item = new PacketItem(packetNumber, packetData);
-        if(!packets.add(item)) {
-            Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+" 
twice");
-            return;
-        }
-        PeerNode[] peers;
-        // We don't strictly need to synchronize, but
-        // if we don't we may lose packets.
-        synchronized(this) {
-            peers = subscriberPeers;
-        }
-        if(peers != null)
-            for(int i=0;i<peers.length;i++)
-                item.forwardTo(peers[i]);
-        
-        // Redistribute it to local subscribers
-        localSubscribers.processPacket(packetNumber, packetData);
-    }
-
-    public class PacketItem implements NumberedItem {
-
-        long packetNumber;
-        byte[] data;
-        
-        public PacketItem(long packetNumber, byte[] packetData) {
-            this.packetNumber = packetNumber;
-            data = packetData;
-        }
-
-        /**
-         * Forward this packet to a subscriber node.
-         * As an FNPSubscribeData. NOT an FNPPublishData.
-         * DO NOT CALL WITH LOCKS HELD, as sends packets.
-         * @param pn Node to send to.
-         */
-        public void forwardTo(PeerNode pn) {
-            
-            // TODO Auto-generated method stub
-            
-        }
-
-        public long getNumber() {
-            return packetNumber;
-        }
-
-    }
-
-}

Deleted: trunk/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
--- trunk/freenet/src/freenet/node/SubscriptionManager.java     2005-10-21 
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/SubscriptionManager.java     2005-10-21 
13:34:42 UTC (rev 7444)
@@ -1,121 +0,0 @@
-package freenet.node;
-
-import java.util.HashMap;
-
-import freenet.keys.ClientPublishStreamKey;
-import freenet.keys.PublishStreamKey;
-import freenet.support.Logger;
-
-/**
- * Tracks Publish/Subscribe streams:
- * - Local subscriptions.
- * - Remote subscriptions (other nodes subscribing to a stream through us).
- * - Whether we are root for a given stream.
- * - Nodes that we subscribe through to get a given stream (i.e. our tree 
parent).
- */
-public class SubscriptionManager {
-    
-    // We have up to 32 subscriptions
-    private final int MAX_COUNT = 32;
-    
-    private final Node node;
-    /** map: key -> sub. definitively has all subs. */
-    private final HashMap subscriptionsByKey;
-//    /** map: parent node -> linkedlist of subs. does not include 
parent==null i.e. parent==this. */
-//    private final HashMap subscriptionsByParent;
-//    /** map: child node -> linkedlist of subs. some subs may have no 
children so won't be included. */
-//    private final HashMap subscriptionsByChildren;
-    /** Client subscriptions. These are the ClientSubscriptionHandler's. */
-    private final HashMap clientSubscriptionsByKey;
-    
-    SubscriptionManager(Node n) {
-        node = n;
-        subscriptionsByKey = new HashMap();
-//        subscriptionsByParent = new HashMap();
-//        subscriptionsByChildren = new HashMap();
-        clientSubscriptionsByKey = new HashMap();
-    }
-    
-    /**
-     * Local subscription.
-     * Add the stream if necessary, and subscribe to it.
-     * @return Null if cannot subscribe, otherwise the ClientSubscription.
-     */
-    public ClientSubscription localSubscribe(ClientPublishStreamKey key, 
SubscriptionCallback cb) {
-        // FIXME implement doing a sub request
-        // For now we will just eavesdrop on locally passed subs.
-        // Each pretends it is the root
-        ClientSubscriptionHandler csh;
-        boolean add = false;
-        Logger.minor(this, "Subscribing locally to "+key);
-        synchronized(this) {
-            csh = (ClientSubscriptionHandler) 
clientSubscriptionsByKey.get(key);
-            if(csh == null) {
-                csh = new ClientSubscriptionHandler(this, key);
-                add = true;
-                clientSubscriptionsByKey.put(key, csh);
-            }
-        }
-        if(add) {
-            Logger.minor(this, "New subscription to "+key);
-            SubscriptionHandler sub = makeSubscription(key.getKey());
-            if(sub.setLocal(csh) != null) {
-                Logger.error(this, "Had local already! for "+key);
-            }
-        }
-        ClientSubscriptionImpl csi = new ClientSubscriptionImpl(key, cb, csh);
-        csh.add(csi);
-        // FIXME implement the rest - especially sending sub reqs out
-        return csi;
-    }
-
-    /**
-     * Create a back-end subscription. This can be called
-     * as a result of a subscribe request or of a local
-     * subscription.
-     */
-    private synchronized SubscriptionHandler makeSubscription(PublishStreamKey 
key) {
-        SubscriptionHandler sub = (SubscriptionHandler) 
subscriptionsByKey.get(key);
-        if(sub != null) return sub;
-        if(subscriptionsByKey.size() >= MAX_COUNT) {
-            Logger.normal(this, "Rejecting subscription for "+key);
-            return null;
-        }
-        
-        // Make a new one
-        sub = new SubscriptionHandler(key);
-        subscriptionsByKey.put(key, sub);
-        return sub;
-    }
-
-    public synchronized void remove(ClientSubscriptionHandler handler) {
-        PublishStreamKey key = handler.getKey().getKey();
-        SubscriptionHandler sub = (SubscriptionHandler) 
subscriptionsByKey.get(key);
-        ClientSubscriptionHandler oldHandler = sub.setLocal(handler);
-        if(oldHandler != null && oldHandler != handler) {
-            Logger.error(this, "Already had a different handler: 
"+oldHandler+" should be "+handler);
-        }
-        if(sub.shouldDrop()) {
-            drop(sub, key);
-        }
-    }
-
-    private synchronized void drop(SubscriptionHandler sub, PublishStreamKey 
key) {
-        subscriptionsByKey.remove(key);
-    }
-
-    /**
-     * Handle a received packet.
-     */
-    public void receivedPacket(PublishStreamKey key, long packetNumber, byte[] 
packetData, PeerNode source) {
-        SubscriptionHandler sub;
-        synchronized(this) {
-            sub = (SubscriptionHandler) subscriptionsByKey.get(key);
-        }
-        if(sub == null) {
-            Logger.normal(this, "Dropped sub packet from "+source+" on "+key+" 
- not subscribed");
-            return;
-        }
-        sub.processPacket(packetNumber, packetData, source);
-    }
-}

Modified: trunk/freenet/src/freenet/node/TextModeClientInterface.java
===================================================================
--- trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-10-21 
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-10-21 
13:34:42 UTC (rev 7444)
@@ -268,55 +268,6 @@
                 System.out.println("Threw: "+t);
                 t.printStackTrace();
             }
-        } else if(line.startsWith("PUBLISH:")) {
-            line = line.substring("PUBLISH:".length());
-            line = line.trim();
-            System.out.println("Stream name: "+line);
-            ClientPublishStreamKey key = n.createPublishStream();
-            FreenetURI streamKey = key.getURI();
-            streamKey = streamKey.setDocName(line);
-            System.out.println("Stream key: "+streamKey);
-            lastSendStreamName = line;
-            streams.put(line, key);
-        } else if(line.startsWith("PUSH:")) {
-            // PUSH:<name>:<text>
-            line = line.substring("PUSH:".length());
-            line = line.trim();
-            int index = line.indexOf(':');
-            if(index == -1) {
-                System.err.println("What do you want me to publish?");
-                return;
-            }
-            String name = line.substring(0, index);
-            ClientPublishStreamKey key = (ClientPublishStreamKey) 
streams.get(name);
-            if(key == null) {
-                System.err.println("Could not find stream called "+name);
-            } else {
-                String content;
-                if(line.length() > index+1) {
-                    content = line.substring(index+1);
-                } else {
-                    content = readLines(reader, false);
-                }
-                System.out.println("Publishing to "+key);
-                System.out.println("Data to publish:\n"+content);
-                n.publish(key, content.getBytes("UTF-8"));
-                lastSendStreamName = name;
-            }
-        } else if(uline.startsWith("SUBSCRIBE:")) {
-            line = line.substring("SUBSCRIBE:".length());
-            line = line.trim();
-            try {
-                FreenetURI uri = new FreenetURI(line);
-                ClientPublishStreamKey key = new ClientPublishStreamKey(uri);
-                n.subscribe(key, new MySubscriptionCallback(uri.getDocName()));
-                System.out.println("Subscribed to "+uri.getDocName()+".");
-            } catch (MalformedURLException e1) {
-                System.err.println("Invalid URI: "+e1.getMessage());
-                e1.printStackTrace();
-                return;
-            }
-            
         } else if(uline.startsWith("STATUS")) {
             SimpleFieldSet fs = n.exportFieldSet();
             System.out.println(fs.toString());
@@ -351,20 +302,6 @@
                 if(content.equals("")) return;
                 connect(content);
             }
-        } else if(uline.startsWith("SUBFILE:")) {
-            String filename = line.substring("SUBFILE:".length()).trim();
-            System.out.println("Writing all received subscription data to 
"+filename);
-            try {
-                FileOutputStream fos = new FileOutputStream(filename, true);
-                OutputStreamWriter w = new OutputStreamWriter(fos);
-                // Test it
-                w.write("Opened at "+System.currentTimeMillis()+" for writing 
subscribed data\n");
-                subscribedDataStream = w;
-                w.flush();
-            } catch (IOException e) {
-                System.err.println("Could not use file: "+e.getMessage());
-            }
-            
         } else if(uline.startsWith("NAME:")) {
             System.out.println("Node name currently: "+n.myName);
             String key = line.substring("NAME:".length());
@@ -374,18 +311,6 @@
                 key = key.substring(0, key.length()-2);
             System.out.println("New name: "+key);
             n.setName(key);
-        } else if(uline.startsWith("SAY ") || uline.startsWith("SAY:")) {
-            String toSay = line.substring("SAY:".length()).trim();
-            if(lastSendStreamName != null) {
-                ClientPublishStreamKey key = (ClientPublishStreamKey) 
streams.get(lastSendStreamName);
-                if(key == null) {
-                    System.err.println("Could not find stream called 
"+lastSendStreamName);
-                } else {
-                    System.out.println("Publishing to "+key);
-                    System.out.println("Data to publish:\n"+toSay);
-                    n.publish(key, toSay.getBytes("UTF-8"));
-                }
-            }
         } else {
             
         }
@@ -498,63 +423,4 @@
         }
         return sb.toString();
     }
-
-    /**
-     * 
-     * SubscriptionCallback that dumps output to stdout.
-     * FIXME this might block if stdout is redirected and disk is full...
-     */
-    public class MySubscriptionCallback implements SubscriptionCallback {
-
-        final String name;
-        
-        public MySubscriptionCallback(String string) {
-            name = string;
-        }
-
-        public void got(long packetNumber, byte[] data) {
-            try {
-                subscribedDataStream.write(name+":"+packetNumber+":"+new 
String(data)+"\n");
-                subscribedDataStream.flush();
-            } catch (IOException e) {
-                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
-                Logger.error(this, s);
-                System.err.println(s);
-            }
-        }
-
-        public void lostConnection() {
-            try {
-                subscribedDataStream.write(name+":LOST CONNECTION\n");
-                subscribedDataStream.flush();
-            } catch (IOException e) {
-                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
-                Logger.error(this, s);
-                System.err.println(s);
-            }
-        }
-
-        public void restarted() {
-            try {
-                subscribedDataStream.write(name+":RESTARTED\n");
-                subscribedDataStream.flush();
-            } catch (IOException e) {
-                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
-                Logger.error(this, s);
-                System.err.println(s);
-            }
-        }
-
-        public void connected() {
-            try {
-                subscribedDataStream.write(name+":CONNECTED\n");
-                subscribedDataStream.flush();
-            } catch (IOException e) {
-                String s = "Error writing to subscriptions output file - disk 
full? "+e.getMessage();
-                Logger.error(this, s);
-                System.err.println(s);
-            }
-        }
-    }
-
 }

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to