Author: toad
Date: 2005-10-21 13:34:42 +0000 (Fri, 21 Oct 2005)
New Revision: 7444
Removed:
trunk/freenet/src/freenet/node/ClientSubscription.java
trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java
trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java
trunk/freenet/src/freenet/node/PublishContext.java
trunk/freenet/src/freenet/node/PublishHandlerSender.java
trunk/freenet/src/freenet/node/SubscriptionCallback.java
trunk/freenet/src/freenet/node/SubscriptionHandler.java
trunk/freenet/src/freenet/node/SubscriptionManager.java
Modified:
trunk/freenet/src/freenet/io/comm/DMT.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/NodeDispatcher.java
trunk/freenet/src/freenet/node/SimpleClient.java
trunk/freenet/src/freenet/node/TextModeClientInterface.java
Log:
Remove publish/subscribe from trunk.
Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java 2005-10-21 13:25:48 UTC (rev
7443)
+++ trunk/freenet/src/freenet/io/comm/DMT.java 2005-10-21 13:34:42 UTC (rev
7444)
@@ -776,76 +776,6 @@
return msg;
}
- public static final MessageType FNPPublishData = new
MessageType("FNPPublishData") {{
- addField(UID, Long.class);
- addField(HTL, Short.class);
- addField(DATA, ShortBuffer.class);
- addField(KEY, PublishStreamKey.class);
- addField(STREAM_SEQNO, Long.class);
- addField(NEAREST_LOCATION, Double.class);
- }};
-
- public static final Message createFNPPublishData(short htl, byte[] data,
PublishStreamKey key, long seqNo, long id, double nearestLoc) {
- Message msg = new Message(FNPPublishData);
- msg.set(HTL, htl);
- msg.set(KEY, key);
- msg.set(DATA, new ShortBuffer(data));
- msg.set(STREAM_SEQNO, seqNo);
- msg.set(UID, id);
- msg.set(NEAREST_LOCATION, nearestLoc);
- return msg;
- }
-
- public static final MessageType FNPSubscribeData = new
MessageType("FNPSubscribeData") {{
- addField(DATA, ShortBuffer.class);
- addField(KEY, PublishStreamKey.class);
- addField(STREAM_SEQNO, Long.class);
- }};
-
- public static final Message createFNPSubscribeData(PublishStreamKey key,
long seqNo, byte[] data) {
- Message msg = new Message(FNPSubscribeData);
- msg.set(KEY, key);
- msg.set(STREAM_SEQNO, seqNo);
- msg.set(DATA, new ShortBuffer(data));
- return msg;
- }
-
- public static final MessageType FNPPublishDataSucceeded = new
MessageType("FNPPublishDataSucceeded") {{
- addField(UID, Long.class);
- }};
-
- public static final Message createFNPPublishDataSucceeded(long id) {
- Message msg = new Message(FNPPublishDataSucceeded);
- msg.set(UID, id);
- return msg;
- }
-
- public static final MessageType FNPPublishDataInvalid = new
MessageType("FNPPublishDataInvalid") {{
- addField(UID, Long.class);
- }};
-
- public static final Message createFNPPublishDataInvalid(long id) {
- Message msg = new Message(FNPPublishDataInvalid);
- msg.set(UID, id);
- return msg;
- }
-
-// public static final MessageType FNPSubscribeRequest = new
MessageType("FNPSubscribeRequest") {{
-// addField(UID, Long.class);
-// addField(HTL, Short.class);
-// addField(KEY, PublishStreamKey.class);
-// addField(STREAM_SEQNO, Long.class);
-// }};
-//
-// public static final Message createFNPSubscribeRequest(long uid, short
htl, PublishStreamKey key, long seqNo) {
-// Message msg = new Message(FNPSubscribeRequest);
-// msg.set(UID, uid);
-// msg.set(HTL, htl);
-// msg.set(KEY, key);
-// msg.set(STREAM_SEQNO, seqNo);
-// return msg;
-// }
-//
public static void init() { }
}
Deleted: trunk/freenet/src/freenet/node/ClientSubscription.java
===================================================================
--- trunk/freenet/src/freenet/node/ClientSubscription.java 2005-10-21
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/ClientSubscription.java 2005-10-21
13:34:42 UTC (rev 7444)
@@ -1,16 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientPublishStreamKey;
-
-/**
- * A client subscription.
- */
-public interface ClientSubscription {
-
- public ClientPublishStreamKey getKey();
-
- public SubscriptionCallback getCallback();
-
- public void disconnect();
-
-}
Deleted: trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java
2005-10-21 13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/ClientSubscriptionHandler.java
2005-10-21 13:34:42 UTC (rev 7444)
@@ -1,59 +0,0 @@
-package freenet.node;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import freenet.keys.ClientPublishStreamKey;
-import freenet.support.Logger;
-
-/**
- * Contains a list of ClientSubscriptionImpl's.
- * When they all disconnect, we unregister.
- */
-public class ClientSubscriptionHandler {
-
-
- final ClientPublishStreamKey key;
- final SubscriptionManager sm;
- boolean finished = false;
- private final LinkedList clientSubs;
-
- ClientSubscriptionHandler(SubscriptionManager manager,
ClientPublishStreamKey key) {
- clientSubs = new LinkedList();
- sm = manager;
- this.key = key;
- }
-
- synchronized void add(ClientSubscriptionImpl sub) {
- if(finished)
- throw new IllegalArgumentException();
- clientSubs.add(sub);
- }
-
- synchronized void remove(ClientSubscriptionImpl sub) {
- clientSubs.remove(sub);
- if(clientSubs.size() == 0) {
- finished = true;
- sm.remove(this);
- }
- }
-
- public ClientPublishStreamKey getKey() {
- return key;
- }
-
- /**
- * Received data!
- */
- public void processPacket(long packetNumber, byte[] packetData) {
- byte[] finalData = key.decrypt(packetNumber, packetData);
- if(finalData == null) {
- Logger.error(this, "Packet did not decrypt");
- return;
- }
- for(Iterator i=clientSubs.iterator();i.hasNext();) {
- ClientSubscriptionImpl impl = (ClientSubscriptionImpl)i.next();
- impl.processPacket(packetNumber, finalData);
- }
- }
-}
Deleted: trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java
===================================================================
--- trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java 2005-10-21
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/ClientSubscriptionImpl.java 2005-10-21
13:34:42 UTC (rev 7444)
@@ -1,49 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientPublishStreamKey;
-import freenet.support.Logger;
-
-public class ClientSubscriptionImpl implements ClientSubscription {
-
- private ClientPublishStreamKey key;
- private SubscriptionCallback cb;
- private ClientSubscriptionHandler ch;
- private boolean finished = false;
-
- /**
- * @param key2
- * @param cb2
- * @param csh
- */
- public ClientSubscriptionImpl(ClientPublishStreamKey key2,
SubscriptionCallback cb2, ClientSubscriptionHandler csh) {
- cb = cb2;
- key = key2;
- ch = csh;
- }
-
- public ClientPublishStreamKey getKey() {
- return key;
- }
-
- public SubscriptionCallback getCallback() {
- return cb;
- }
-
- public void disconnect() {
- finished = true;
- ch.remove(this);
- }
-
- /**
- * Received decrypted data from the stream.
- */
- public void processPacket(long packetNumber, byte[] finalData) {
- try {
- cb.got(packetNumber, finalData);
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+" from callback in processPacket("+
- packetNumber+") on "+this, t);
- }
- }
-
-}
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2005-10-21 13:25:48 UTC (rev
7443)
+++ trunk/freenet/src/freenet/node/Node.java 2005-10-21 13:34:42 UTC (rev
7444)
@@ -89,8 +89,6 @@
private final HashMap transferringRequestSenders;
/** InsertSender's currently running, by KeyHTLPair */
private final HashMap insertSenders;
- /** Subscriptions */
- final SubscriptionManager subscriptions;
/** Locally published stream contexts */
private final Hashtable localStreamContexts;
@@ -283,7 +281,6 @@
ps = new PacketSender(this);
peers = new PeerManager(this, prefix+"peers-"+portNumber);
- subscriptions = new SubscriptionManager(this);
try {
usm = new UdpSocketManager(portNumber);
@@ -697,69 +694,4 @@
myName = key;
writeNodeFile();
}
-
- /** Create a publish stream and create context needed to insert data on
it. */
- public ClientPublishStreamKey createPublishStream() {
- ClientPublishStreamKey key =
- ClientPublishStreamKey.createRandom(random);
- makePublishContext(key);
- return key;
- }
-
- private PublishContext makePublishContext(ClientPublishStreamKey key) {
- synchronized(localStreamContexts) {
- PublishContext ctx = (PublishContext) localStreamContexts.get(key);
- if(ctx == null) {
- // Which it usually does
- ctx = new PublishContext(key, this);
- localStreamContexts.put(key, ctx);
- return ctx;
- } else {
- Logger.error(this, "Reusing existing publish context for
"+key);
- // Reuse
- return ctx;
- }
- }
- }
-
- public PublishHandlerSender makePublishHandlerSender(Message m) {
- long id = m.getLong(DMT.UID);
- Long lid = new Long(id);
- boolean reject = false;
- try {
- synchronized(this) {
- if(recentlyCompletedIDs.contains(lid))
- reject = true;
- else if(runningUIDs.contains(lid))
- reject = true;
- else runningUIDs.add(lid);
- }
- if(reject) {
- try {
- Message msg = DMT.createFNPRejectedLoop(id);
- ((PeerNode)m.getSource()).sendAsync(msg, null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Lost connection refusing PublishData:
"+id);
- }
- return null;
- } else {
- return new PublishHandlerSender(m, this);
- }
- } catch (Throwable t) {
- synchronized(this) {
- runningUIDs.remove(lid);
- recentlyCompletedIDs.remove(lid);
- }
- Logger.error(this, "Caught "+t+" handling PublishData "+id, t);
- return null;
- }
- }
-
- public void publish(ClientPublishStreamKey key, byte[] data) {
- makePublishContext(key).publish(data);
- }
-
- public ClientSubscription subscribe(ClientPublishStreamKey key,
SubscriptionCallback cb) {
- return subscriptions.localSubscribe(key, cb);
- }
}
Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java 2005-10-21 13:25:48 UTC
(rev 7443)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java 2005-10-21 13:34:42 UTC
(rev 7444)
@@ -74,27 +74,11 @@
return handleDataRequest(m);
} else if(spec == DMT.FNPInsertRequest) {
return handleInsertRequest(m);
- } else if(spec == DMT.FNPPublishData) {
- return handlePublishData(m);
}
-// } // SubscribeData, SubscribeRestarted etc handled by
SubscribeSender.
return false;
}
/**
- * Handle an FNPPublishData message.
- * @return False to put it back onto the queue.
- */
- private boolean handlePublishData(Message m) {
- // Create a PublishSender. This will do all the work.
- // FIXME maybe we should check whether we've sent the packet before?
- // It's not really a viable DoS but it is good practice...
- PublishHandlerSender ps =
- node.makePublishHandlerSender(m);
- return true;
- }
-
- /**
* Handle an incoming FNPDataRequest.
*/
private boolean handleDataRequest(Message m) {
Deleted: trunk/freenet/src/freenet/node/PublishContext.java
===================================================================
--- trunk/freenet/src/freenet/node/PublishContext.java 2005-10-21 13:25:48 UTC
(rev 7443)
+++ trunk/freenet/src/freenet/node/PublishContext.java 2005-10-21 13:34:42 UTC
(rev 7444)
@@ -1,41 +0,0 @@
-package freenet.node;
-
-import freenet.keys.ClientPublishStreamKey;
-
-/**
- * Context for a locally originated publish stream.
- */
-public class PublishContext {
-
- final ClientPublishStreamKey key;
- final Node node;
- /** Last packet number. Incremented each time, modulo 2^63-1 */
- private long lastPacketNumber;
-
- /**
- * @param key
- */
- public PublishContext(ClientPublishStreamKey key, Node node) {
- this.key = key;
- this.node = node;
- lastPacketNumber = Math.abs(node.random.nextLong());
- }
-
- /**
- * Publish a block of data to the stream. Must not exceed
- * size limit.
- */
- public void publish(byte[] data) {
- long packetNumber;
- synchronized(this) {
- packetNumber = lastPacketNumber;
- long next = lastPacketNumber+1;
- if(next < 0) next = 0;
- lastPacketNumber = next;
- }
- byte[] encrypted =
- key.encrypt(packetNumber, data, node.random);
- PublishHandlerSender ps = new PublishHandlerSender(key.getKey(),
node.random.nextLong(), packetNumber, encrypted, node, null, -1.0);
- }
-
-}
Deleted: trunk/freenet/src/freenet/node/PublishHandlerSender.java
===================================================================
--- trunk/freenet/src/freenet/node/PublishHandlerSender.java 2005-10-21
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/PublishHandlerSender.java 2005-10-21
13:34:42 UTC (rev 7444)
@@ -1,287 +0,0 @@
-package freenet.node;
-
-import java.util.HashSet;
-
-import freenet.io.comm.DMT;
-import freenet.io.comm.DisconnectedException;
-import freenet.io.comm.Message;
-import freenet.io.comm.MessageFilter;
-import freenet.io.comm.NotConnectedException;
-import freenet.keys.PublishStreamKey;
-import freenet.support.Fields;
-import freenet.support.Logger;
-import freenet.support.ShortBuffer;
-
-/**
- * Forwards a PublishData message. Routed, exactly as an insert.
- * Also communicates with the previous hop. For inserts and requests
- * we split this into two - Handler and Sender - but for this it is
- * simpler to have one, especially as there is no coalescing.
- */
-public class PublishHandlerSender implements Runnable {
-
- // Basics
- final PublishStreamKey key;
- final double target;
- final long uid;
- private short htl;
- final PeerNode source;
- final Node node;
- final long packetNumber;
- final byte[] packetData;
- final Thread t;
- private double closestLocation;
-
- // Constants - FIXME way too high?
- static final int ACCEPTED_TIMEOUT = 10000;
- static final int COMPLETE_TIMEOUT = 120000;
-
- public String toString() {
- return super.toString()+": id="+uid+", source="+source+",
packetNo="+packetNumber+", data="+Fields.hashCode(packetData);
- }
-
-
- /**
- * Create a PublishHandlerSender.
- * closestLocation must be reset by the caller if we are closer to target
- * than the node sending the request.
- */
- public PublishHandlerSender(PublishStreamKey key2, long id, long packetNo,
byte[] data, Node n, PeerNode src, double closestLoc) {
- node = n;
- key = key2;
- target = key.toNormalizedDouble();
- uid = id;
- packetNumber = packetNo;
- packetData = data;
- source = src;
- closestLocation = closestLoc;
- htl = Node.MAX_HTL;
- t = new Thread(this);
- t.setDaemon(true);
- t.start();
- }
-
- /**
- * Create from an incoming Message. Does not check
- * UID/key for collision; this must be done by caller.
- * Will do everything else though, including resetting
- * the closestLocation if necessary.
- */
- public PublishHandlerSender(Message m, Node n) {
- this.node = n;
- key = (PublishStreamKey) m.getObject(DMT.KEY);
- target = key.toNormalizedDouble();
- uid = m.getLong(DMT.UID);
- packetNumber = m.getLong(DMT.STREAM_SEQNO);
- source = (PeerNode) m.getSource();
- packetData = ((ShortBuffer)m.getObject(DMT.DATA)).getData();
- closestLocation = m.getDouble(DMT.NEAREST_LOCATION);
- htl = m.getShort(DMT.HTL);
- double myLoc = n.lm.getLocation().getValue();
- if(Math.abs(myLoc - target) < Math.abs(closestLocation - target))
- closestLocation = myLoc;
- t = new Thread(this);
- t.setDaemon(true);
- t.start();
- }
-
- /** Very similar to InsertSender.run() */
- public void run() {
- Logger.minor(this, "Running "+this);
- try {
- // Copy it as it will be decrypted in place
- byte[] packetDataCopy = new byte[packetData.length];
- System.arraycopy(packetData, 0, packetDataCopy, 0,
packetData.length);
- node.subscriptions.receivedPacket(key, packetNumber,
packetDataCopy, source);
- short origHTL = htl;
- if(source != null) {
- Message msg = DMT.createFNPAccepted(uid);
- try {
- source.sendAsync(msg, null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Not connected sending Accepted on
"+this);
- }
- }
-
- HashSet nodesRoutedTo = new HashSet();
-
- // Don't check whether source is connected; forward anyway
-
- while(true) {
- if(htl == 0) {
- // Ran out of hops; success
- if(source != null) {
- Message msg = DMT.createFNPPublishDataSucceeded(uid);
- try {
- source.sendAsync(msg, null);
- } catch (NotConnectedException e) {
- Logger.minor(this, "Not connected sending
FNPPublishDataSucceeded on "+this);
- }
- }
- return;
- }
-
- // Route
- PeerNode next;
-
- // Can backtrack, so only route to nodes closer than we are to
target.
- double nextValue;
- synchronized(node.peers) {
- next = node.peers.closerPeer(source, nodesRoutedTo,
target, true);
- if(next != null)
- nextValue = next.getLocation().getValue();
- else
- nextValue = -1.0;
- }
-
- if(next == null) {
- // Backtrack
- Logger.minor(this, "No more routes");
- if(source != null) {
- Message msg = DMT.createFNPRouteNotFound(uid, htl);
- try {
- source.sendAsync(msg, null);
- } catch (NotConnectedException e) {
- Logger.normal(this, "Not connected when RNFing on
"+this);
- }
- } else {
- Logger.minor(this, "RNF on "+this);
- }
- return;
- }
- Logger.minor(this, "Routing insert to "+next);
- nodesRoutedTo.add(next);
-
- if(Math.abs(target - nextValue) > Math.abs(target -
closestLocation)) {
- Logger.minor(this, "Backtracking: target="+target+"
next="+nextValue+" closest="+closestLocation);
- htl = node.decrementHTL(source, htl);
- }
-
- // Now we send it
- /**
- * Possible responses:
- * Accepted
- * RejectedOverload
- * RejectedLoop
- * (timeout)
- */
- Message msg = DMT.createFNPPublishData(htl, packetData, key,
packetNumber, uid, closestLocation);
-
- MessageFilter mfAccepted =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
- MessageFilter mfRejectedLoop =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
- MessageFilter mfRejectedOverload =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
- MessageFilter mfPublishDataInvalid =
MessageFilter.create().setSource(next).setField(DMT.UID,
uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPPublishDataInvalid);
-
- // mfRejectedOverload must be the last thing in the or
- // So its or pointer remains null
- // Otherwise we need to recreate it below
- MessageFilter mf =
mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload.or(mfPublishDataInvalid)));
- mfRejectedOverload.clearOr();
-
- try {
- next.send(msg);
- } catch (NotConnectedException e) {
- Logger.minor(this, "Not connected to "+next+" - skipping");
- continue;
- }
-
- try {
- msg = null;
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e1) {
- Logger.minor(this, "Not connected to "+next+" while
waiting, skipping");
- continue;
- }
-
- if(msg == null) {
- // Timeout waiting for Accepted
- Logger.normal(this, "Timed out waiting for Accepted on
"+this+" from "+next);
- continue;
- }
-
- if(msg.getSpec() == DMT.FNPRejectedLoop) {
- Logger.minor(this, "Rejected: loop for "+this);
- continue;
- }
-
- if(msg.getSpec() == DMT.FNPRejectedOverload) {
- // Propagate back to source; fatal
- if(source != null) {
- Message m = DMT.createFNPRejectedOverload(uid);
- try {
- source.sendAsync(m,null);
- } catch (NotConnectedException e2) {
- Logger.normal(this, "Source disconnected relaying
rejected:overload from "+next+" for "+this);
- }
- } else {
- Logger.normal(this, "FNPDataPublish rejected: overload
from "+next+" for "+this);
- }
- return;
- }
-
- if(msg.getSpec() != DMT.FNPAccepted) {
- throw new IllegalStateException("Unrecognized message:
"+msg+" while waiting for Accepted on "+this);
- }
-
- // Got an Accepted; wait for a success
-
- MessageFilter mfSucceeded =
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
uid).setType(DMT.FNPPublishDataSucceeded);
- MessageFilter mfRNF =
MessageFilter.create().setTimeout(COMPLETE_TIMEOUT).setSource(next).setField(DMT.UID,
uid).setType(DMT.FNPRouteNotFound);
- mfRejectedOverload.clearOr();
- mfPublishDataInvalid.clearOr();
-
- mf =
mfSucceeded.or(mfRNF.or(mfRejectedOverload.or(mfPublishDataInvalid)));
-
- msg = null;
- try {
- msg = node.usm.waitFor(mf);
- } catch (DisconnectedException e3) {
- Logger.error(this, "Disconnected from next: "+next+" while
waiting for completion on "+this);
- continue; // can cause load multiplication, hence the
error; what else are we supposed to do though?
- }
-
- if(msg == null || msg.getSpec() == DMT.FNPRejectedOverload) {
- // Timeout
- Logger.error(this, "Timeout waiting for completion of
"+this+" : "+msg);
- // Propagate back to source; fatal
- if(source != null) {
- Message m = DMT.createFNPRejectedOverload(uid);
- try {
- source.sendAsync(m,null);
- } catch (NotConnectedException e2) {
- Logger.normal(this, "Source disconnected relaying
rejected:overload from "+next+" for "+this);
- }
- } else {
- Logger.normal(this, "FNPDataPublish rejected: overload
from "+next+" for "+this);
- }
- return;
- }
-
- if(msg.getSpec() == DMT.FNPRouteNotFound) {
- Logger.minor(this, "Rejected: RNF");
- // Still gets the data - but not yet
- short newHtl = msg.getShort(DMT.HTL);
- if(htl > newHtl) htl = newHtl;
- continue;
-
- }
-
- if(msg.getSpec() == DMT.FNPPublishDataInvalid) {
- Logger.error(this, "Got data invalid from "+next+" for
"+this);
- // FIXME: check validity ourself, propagate error if needed
- continue;
- }
-
- if(msg.getSpec() != DMT.FNPPublishDataSucceeded) {
- throw new IllegalStateException("Got unexpected message
"+msg+" waiting for completion on "+this);
- }
-
- // Success
- return;
- }
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+" in "+this, t);
- }
- }
-
-}
Modified: trunk/freenet/src/freenet/node/SimpleClient.java
===================================================================
--- trunk/freenet/src/freenet/node/SimpleClient.java 2005-10-21 13:25:48 UTC
(rev 7443)
+++ trunk/freenet/src/freenet/node/SimpleClient.java 2005-10-21 13:34:42 UTC
(rev 7444)
@@ -23,23 +23,4 @@
* Insert a key.
*/
public void putCHK(ClientCHKBlock key);
-
- /**
- * Create a publish/subscribe stream key. Create any context
- * needed to insert it.
- */
- public ClientPublishStreamKey createPublishStream();
-
- /**
- * Publish a block of data to a publish/subscribe key.
- */
- public void publish(ClientPublishStreamKey key, byte[] data);
-
- /**
- * Subscribe to a stream.
- * @param key The stream key.
- * @param cb Callback to notify when we get data.
- * @return True if the subscribe succeeded.
- */
- public ClientSubscription subscribe(ClientPublishStreamKey key,
SubscriptionCallback cb);
}
Deleted: trunk/freenet/src/freenet/node/SubscriptionCallback.java
===================================================================
--- trunk/freenet/src/freenet/node/SubscriptionCallback.java 2005-10-21
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/SubscriptionCallback.java 2005-10-21
13:34:42 UTC (rev 7444)
@@ -1,16 +0,0 @@
-package freenet.node;
-
-/**
- * Publish/Subscribe subscriber callback.
- */
-public interface SubscriptionCallback {
-
- void got(long packetNumber, byte[] data);
-
- void lostConnection();
-
- void restarted();
-
- void connected();
-
-}
Deleted: trunk/freenet/src/freenet/node/SubscriptionHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SubscriptionHandler.java 2005-10-21
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/SubscriptionHandler.java 2005-10-21
13:34:42 UTC (rev 7444)
@@ -1,121 +0,0 @@
-package freenet.node;
-
-import freenet.keys.PublishStreamKey;
-import freenet.support.Logger;
-import freenet.support.NumberedItem;
-import freenet.support.NumberedRecentItems;
-
-/**
- * A single subscription.
- * May have a parent node, or may be the root.
- * May have many child nodes.
- */
-public class SubscriptionHandler {
-
- static final int KEEP_PACKETS = 32;
-
- final PublishStreamKey key;
- ClientSubscriptionHandler localSubscribers;
- PeerNode[] subscriberPeers;
- final NumberedRecentItems packets;
-
- public SubscriptionHandler(PublishStreamKey k) {
- key = k;
- subscriberPeers = null;
- packets = new NumberedRecentItems(KEEP_PACKETS, true);
- }
-
- /**
- * Set the local subscribers handler.
- * @return The previous local subscribers handler - should be null!
- */
- public synchronized ClientSubscriptionHandler
setLocal(ClientSubscriptionHandler csh) {
- ClientSubscriptionHandler h = localSubscribers;
- localSubscribers = csh;
- return h;
- }
-
- public synchronized boolean shouldDrop() {
- return subscriberPeersEmpty() && localSubscribers == null;
- }
-
- /**
- * @return True if there are no subscriber peers.
- */
- private final boolean subscriberPeersEmpty() {
- PeerNode[] peers = subscriberPeers;
- return (peers == null || peers.length == 0);
- }
-
- public void addSubscriberNode(PeerNode pn, long lastSeen) {
- synchronized(this) {
- if(subscriberPeers == null || subscriberPeers.length == 0) {
- subscriberPeers = new PeerNode[] { pn };
- } else {
- PeerNode[] peers = new PeerNode[subscriberPeers.length+1];
- System.arraycopy(subscriberPeers, 0, peers, 0,
subscriberPeers.length);
- peers[peers.length-1] = pn;
- subscriberPeers = peers;
- }
- }
- NumberedItem[] items = packets.getAfter(lastSeen);
- if(items == null || items.length == 0) return;
- for(int i=0;i<items.length;i++) {
- PacketItem item = (PacketItem)items[i];
- item.forwardTo(pn);
- }
- }
-
- /**
- * Process an incoming PublishData packet.
- */
- public void processPacket(long packetNumber, byte[] packetData, PeerNode
source) {
- // First, have we seen it before?
- PacketItem item = new PacketItem(packetNumber, packetData);
- if(!packets.add(item)) {
- Logger.minor(this, "Got packet "+packetNumber+" on stream "+key+"
twice");
- return;
- }
- PeerNode[] peers;
- // We don't strictly need to synchronize, but
- // if we don't we may lose packets.
- synchronized(this) {
- peers = subscriberPeers;
- }
- if(peers != null)
- for(int i=0;i<peers.length;i++)
- item.forwardTo(peers[i]);
-
- // Redistribute it to local subscribers
- localSubscribers.processPacket(packetNumber, packetData);
- }
-
- public class PacketItem implements NumberedItem {
-
- long packetNumber;
- byte[] data;
-
- public PacketItem(long packetNumber, byte[] packetData) {
- this.packetNumber = packetNumber;
- data = packetData;
- }
-
- /**
- * Forward this packet to a subscriber node.
- * As an FNPSubscribeData. NOT an FNPPublishData.
- * DO NOT CALL WITH LOCKS HELD, as sends packets.
- * @param pn Node to send to.
- */
- public void forwardTo(PeerNode pn) {
-
- // TODO Auto-generated method stub
-
- }
-
- public long getNumber() {
- return packetNumber;
- }
-
- }
-
-}
Deleted: trunk/freenet/src/freenet/node/SubscriptionManager.java
===================================================================
--- trunk/freenet/src/freenet/node/SubscriptionManager.java 2005-10-21
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/SubscriptionManager.java 2005-10-21
13:34:42 UTC (rev 7444)
@@ -1,121 +0,0 @@
-package freenet.node;
-
-import java.util.HashMap;
-
-import freenet.keys.ClientPublishStreamKey;
-import freenet.keys.PublishStreamKey;
-import freenet.support.Logger;
-
-/**
- * Tracks Publish/Subscribe streams:
- * - Local subscriptions.
- * - Remote subscriptions (other nodes subscribing to a stream through us).
- * - Whether we are root for a given stream.
- * - Nodes that we subscribe through to get a given stream (i.e. our tree
parent).
- */
-public class SubscriptionManager {
-
- // We have up to 32 subscriptions
- private final int MAX_COUNT = 32;
-
- private final Node node;
- /** map: key -> sub. definitively has all subs. */
- private final HashMap subscriptionsByKey;
-// /** map: parent node -> linkedlist of subs. does not include
parent==null i.e. parent==this. */
-// private final HashMap subscriptionsByParent;
-// /** map: child node -> linkedlist of subs. some subs may have no
children so won't be included. */
-// private final HashMap subscriptionsByChildren;
- /** Client subscriptions. These are the ClientSubscriptionHandler's. */
- private final HashMap clientSubscriptionsByKey;
-
- SubscriptionManager(Node n) {
- node = n;
- subscriptionsByKey = new HashMap();
-// subscriptionsByParent = new HashMap();
-// subscriptionsByChildren = new HashMap();
- clientSubscriptionsByKey = new HashMap();
- }
-
- /**
- * Local subscription.
- * Add the stream if necessary, and subscribe to it.
- * @return Null if cannot subscribe, otherwise the ClientSubscription.
- */
- public ClientSubscription localSubscribe(ClientPublishStreamKey key,
SubscriptionCallback cb) {
- // FIXME implement doing a sub request
- // For now we will just eavesdrop on locally passed subs.
- // Each pretends it is the root
- ClientSubscriptionHandler csh;
- boolean add = false;
- Logger.minor(this, "Subscribing locally to "+key);
- synchronized(this) {
- csh = (ClientSubscriptionHandler)
clientSubscriptionsByKey.get(key);
- if(csh == null) {
- csh = new ClientSubscriptionHandler(this, key);
- add = true;
- clientSubscriptionsByKey.put(key, csh);
- }
- }
- if(add) {
- Logger.minor(this, "New subscription to "+key);
- SubscriptionHandler sub = makeSubscription(key.getKey());
- if(sub.setLocal(csh) != null) {
- Logger.error(this, "Had local already! for "+key);
- }
- }
- ClientSubscriptionImpl csi = new ClientSubscriptionImpl(key, cb, csh);
- csh.add(csi);
- // FIXME implement the rest - especially sending sub reqs out
- return csi;
- }
-
- /**
- * Create a back-end subscription. This can be called
- * as a result of a subscribe request or of a local
- * subscription.
- */
- private synchronized SubscriptionHandler makeSubscription(PublishStreamKey
key) {
- SubscriptionHandler sub = (SubscriptionHandler)
subscriptionsByKey.get(key);
- if(sub != null) return sub;
- if(subscriptionsByKey.size() >= MAX_COUNT) {
- Logger.normal(this, "Rejecting subscription for "+key);
- return null;
- }
-
- // Make a new one
- sub = new SubscriptionHandler(key);
- subscriptionsByKey.put(key, sub);
- return sub;
- }
-
- public synchronized void remove(ClientSubscriptionHandler handler) {
- PublishStreamKey key = handler.getKey().getKey();
- SubscriptionHandler sub = (SubscriptionHandler)
subscriptionsByKey.get(key);
- ClientSubscriptionHandler oldHandler = sub.setLocal(handler);
- if(oldHandler != null && oldHandler != handler) {
- Logger.error(this, "Already had a different handler:
"+oldHandler+" should be "+handler);
- }
- if(sub.shouldDrop()) {
- drop(sub, key);
- }
- }
-
- private synchronized void drop(SubscriptionHandler sub, PublishStreamKey
key) {
- subscriptionsByKey.remove(key);
- }
-
- /**
- * Handle a received packet.
- */
- public void receivedPacket(PublishStreamKey key, long packetNumber, byte[]
packetData, PeerNode source) {
- SubscriptionHandler sub;
- synchronized(this) {
- sub = (SubscriptionHandler) subscriptionsByKey.get(key);
- }
- if(sub == null) {
- Logger.normal(this, "Dropped sub packet from "+source+" on "+key+"
- not subscribed");
- return;
- }
- sub.processPacket(packetNumber, packetData, source);
- }
-}
Modified: trunk/freenet/src/freenet/node/TextModeClientInterface.java
===================================================================
--- trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-10-21
13:25:48 UTC (rev 7443)
+++ trunk/freenet/src/freenet/node/TextModeClientInterface.java 2005-10-21
13:34:42 UTC (rev 7444)
@@ -268,55 +268,6 @@
System.out.println("Threw: "+t);
t.printStackTrace();
}
- } else if(line.startsWith("PUBLISH:")) {
- line = line.substring("PUBLISH:".length());
- line = line.trim();
- System.out.println("Stream name: "+line);
- ClientPublishStreamKey key = n.createPublishStream();
- FreenetURI streamKey = key.getURI();
- streamKey = streamKey.setDocName(line);
- System.out.println("Stream key: "+streamKey);
- lastSendStreamName = line;
- streams.put(line, key);
- } else if(line.startsWith("PUSH:")) {
- // PUSH:<name>:<text>
- line = line.substring("PUSH:".length());
- line = line.trim();
- int index = line.indexOf(':');
- if(index == -1) {
- System.err.println("What do you want me to publish?");
- return;
- }
- String name = line.substring(0, index);
- ClientPublishStreamKey key = (ClientPublishStreamKey)
streams.get(name);
- if(key == null) {
- System.err.println("Could not find stream called "+name);
- } else {
- String content;
- if(line.length() > index+1) {
- content = line.substring(index+1);
- } else {
- content = readLines(reader, false);
- }
- System.out.println("Publishing to "+key);
- System.out.println("Data to publish:\n"+content);
- n.publish(key, content.getBytes("UTF-8"));
- lastSendStreamName = name;
- }
- } else if(uline.startsWith("SUBSCRIBE:")) {
- line = line.substring("SUBSCRIBE:".length());
- line = line.trim();
- try {
- FreenetURI uri = new FreenetURI(line);
- ClientPublishStreamKey key = new ClientPublishStreamKey(uri);
- n.subscribe(key, new MySubscriptionCallback(uri.getDocName()));
- System.out.println("Subscribed to "+uri.getDocName()+".");
- } catch (MalformedURLException e1) {
- System.err.println("Invalid URI: "+e1.getMessage());
- e1.printStackTrace();
- return;
- }
-
} else if(uline.startsWith("STATUS")) {
SimpleFieldSet fs = n.exportFieldSet();
System.out.println(fs.toString());
@@ -351,20 +302,6 @@
if(content.equals("")) return;
connect(content);
}
- } else if(uline.startsWith("SUBFILE:")) {
- String filename = line.substring("SUBFILE:".length()).trim();
- System.out.println("Writing all received subscription data to
"+filename);
- try {
- FileOutputStream fos = new FileOutputStream(filename, true);
- OutputStreamWriter w = new OutputStreamWriter(fos);
- // Test it
- w.write("Opened at "+System.currentTimeMillis()+" for writing
subscribed data\n");
- subscribedDataStream = w;
- w.flush();
- } catch (IOException e) {
- System.err.println("Could not use file: "+e.getMessage());
- }
-
} else if(uline.startsWith("NAME:")) {
System.out.println("Node name currently: "+n.myName);
String key = line.substring("NAME:".length());
@@ -374,18 +311,6 @@
key = key.substring(0, key.length()-2);
System.out.println("New name: "+key);
n.setName(key);
- } else if(uline.startsWith("SAY ") || uline.startsWith("SAY:")) {
- String toSay = line.substring("SAY:".length()).trim();
- if(lastSendStreamName != null) {
- ClientPublishStreamKey key = (ClientPublishStreamKey)
streams.get(lastSendStreamName);
- if(key == null) {
- System.err.println("Could not find stream called
"+lastSendStreamName);
- } else {
- System.out.println("Publishing to "+key);
- System.out.println("Data to publish:\n"+toSay);
- n.publish(key, toSay.getBytes("UTF-8"));
- }
- }
} else {
}
@@ -498,63 +423,4 @@
}
return sb.toString();
}
-
- /**
- *
- * SubscriptionCallback that dumps output to stdout.
- * FIXME this might block if stdout is redirected and disk is full...
- */
- public class MySubscriptionCallback implements SubscriptionCallback {
-
- final String name;
-
- public MySubscriptionCallback(String string) {
- name = string;
- }
-
- public void got(long packetNumber, byte[] data) {
- try {
- subscribedDataStream.write(name+":"+packetNumber+":"+new
String(data)+"\n");
- subscribedDataStream.flush();
- } catch (IOException e) {
- String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
- Logger.error(this, s);
- System.err.println(s);
- }
- }
-
- public void lostConnection() {
- try {
- subscribedDataStream.write(name+":LOST CONNECTION\n");
- subscribedDataStream.flush();
- } catch (IOException e) {
- String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
- Logger.error(this, s);
- System.err.println(s);
- }
- }
-
- public void restarted() {
- try {
- subscribedDataStream.write(name+":RESTARTED\n");
- subscribedDataStream.flush();
- } catch (IOException e) {
- String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
- Logger.error(this, s);
- System.err.println(s);
- }
- }
-
- public void connected() {
- try {
- subscribedDataStream.write(name+":CONNECTED\n");
- subscribedDataStream.flush();
- } catch (IOException e) {
- String s = "Error writing to subscriptions output file - disk
full? "+e.getMessage();
- Logger.error(this, s);
- System.err.println(s);
- }
- }
- }
-
}
_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs