Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv11501/src/freenet
Modified Files:
ConnectionHandler.java ConnectionHandlerComparator.java
Core.java MessageSendCallback.java OpenConnectionManager.java
PeerHandler.java PeerPacket.java PeerPacketMessage.java
Version.java
Log Message:
6217:
Implement PeerHandler. Web interface works. FCP does not work. Probably has
significant bugs. This is unstable after all... and others may want to look at it.
Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.163
retrieving revision 1.164
diff -u -r1.163 -r1.164
--- ConnectionHandler.java 3 Oct 2003 08:35:19 -0000 1.163
+++ ConnectionHandler.java 4 Oct 2003 01:16:55 -0000 1.164
@@ -97,8 +97,6 @@
/** Count objects for number of sends in progress of pending */
//private final Count sending=new Count(0, sendLock);
// AFM (another fucking monitor)
- private volatile int sendingCount = 0;
-
private volatile long sendQueueSize = 0;
private volatile long totalDataSent =0;
@@ -114,6 +112,8 @@
private final Object trailerSendLock = new Object();
private int sendingTrailerChunkBytes = 0;
[...1819 lines suppressed...]
@@ -3069,6 +2792,10 @@
return peer;
}
+ public final PeerHandler getPeerHandler() {
+ return peerHandler;
+ }
+
/**
[EMAIL PROTECTED] identity of the Peer on the other end of the connection
*/
@@ -3451,7 +3178,7 @@
// }
public String toString() {
- return super.toString()+" for "+conn+","+link+", sending
"+sentMessagesCount;
+ return super.toString()+" for "+conn+","+link+", sending "+sentPacket;
}
}
Index: ConnectionHandlerComparator.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandlerComparator.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ConnectionHandlerComparator.java 20 Jul 2003 08:03:35 -0000 1.8
+++ ConnectionHandlerComparator.java 4 Oct 2003 01:16:55 -0000 1.9
@@ -72,7 +72,7 @@
case PEER_IDENTITY:
return
secondaryCompare(iSign,ch1.peerIdentity().toString().compareTo(ch2.peerIdentity().toString()),ch1,ch2);
case SENDING_COUNT:
- return secondaryCompare(iSign,new
Integer(ch1.sendingCount()).compareTo(new Integer(ch2.sendingCount())),ch1,ch2);
+ return secondaryCompare(iSign,new
Integer(ch1.sending() ? 1 : 0).compareTo(new Integer(ch2.sending() ? 1 : 0)),ch1,ch2);
case SENDQUEUE:
return secondaryCompare(iSign,new
Long(ch1.sendQueueSize()).compareTo(new Long(ch2.sendQueueSize())),ch1,ch2);
case RECEIVING:
Index: Core.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Core.java,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -r1.42 -r1.43
--- Core.java 10 Sep 2003 21:19:51 -0000 1.42
+++ Core.java 4 Oct 2003 01:16:55 -0000 1.43
@@ -531,19 +531,19 @@
* or a new node did not respond to
* handshake.
*/
- public final ConnectionHandler makeConnection(Peer p)
- throws CommunicationException {
- return makeConnection(p, 0);
- }
+// public final ConnectionHandler makeConnection(Peer p)
+// throws CommunicationException {
+// return makeConnection(p, 0);
+// }
/**
* Returns an open connection to a node, or null if there aren't any
* free. Will ONLY take them from the cache, will not create new ones.
*/
- public final ConnectionHandler getConnection(Peer p)
- throws CommunicationException {
- return connections.findFreeConnection(p.getIdentity());
- }
+// public final ConnectionHandler getConnection(Peer p)
+// throws CommunicationException {
+// return connections.findFreeConnection(p.getIdentity());
+// }
/**
* Returns an open connection to a node, either by making a new one
@@ -553,39 +553,32 @@
* ConnectFailedException when establishing new
* connections.
*/
- public final ConnectionHandler makeConnection(Peer p, long timeout)
- throws CommunicationException {
- //if (connections == null)
- // throw new CoreException("Core not begun");
- return connections.getConnection(this, p, timeout);
- }
-
-
+// public final ConnectionHandler makeConnection(Peer p, long timeout)
+// throws CommunicationException {
+// //if (connections == null)
+// // throw new CoreException("Core not begun");
+// return connections.getConnection(this, p, timeout);
+// }
+
/**
* Send the message using the appropriate protocol over a free or
* new connection.
*/
- //public OutputStream sendMessage(Message m, Peer destination, long timeout)
- // throws CommunicationException {
- // try {
- // return makeConnection(destination, timeout).sendMessage(m);
- // } catch (ConnectFailedException e) {
- // throw new SendFailedException(e);
- // }
- //}
+ public final TrailerWriter sendMessage(Message m, Peer p, long timeout)
+ throws CommunicationException {
+ return connections.sendMessage(m, p.getIdentity(), null, timeout,
+ PeerHandler.NORMAL);
+ }
/**
* Send the message using the appropriate protocol over a free or
* new connection.
*/
- //public OutputStream sendMessage(Message m, Peer destination)
- // throws CommunicationException {
- // return sendMessage(m, destination, 0);
- //}
-
-
-
+ public final TrailerWriter sendMessage(Message m, Peer p)
+ throws CommunicationException {
+ return sendMessage(m, p, 0);
+ }
// Digest for signatures
private static final Digest ctx = SHA1.getInstance();
Index: MessageSendCallback.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/MessageSendCallback.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MessageSendCallback.java 8 Sep 2003 17:03:03 -0000 1.2
+++ MessageSendCallback.java 4 Oct 2003 01:16:55 -0000 1.3
@@ -4,5 +4,5 @@
import java.io.OutputStream;
public interface MessageSendCallback extends ExceptionCallback {
- void setTrailerStream(OutputStream os); // must be called BEFORE the terminal
success() or thrown()
+ void setTrailerWriter(TrailerWriter tw); // must be called BEFORE the terminal
success() or thrown()
}
Index: OpenConnectionManager.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/OpenConnectionManager.java,v
retrieving revision 1.111
retrieving revision 1.112
diff -u -r1.111 -r1.112
--- OpenConnectionManager.java 3 Oct 2003 08:35:19 -0000 1.111
+++ OpenConnectionManager.java 4 Oct 2003 01:16:56 -0000 1.112
@@ -47,8 +47,11 @@
private final ThreadFactory tf;
private final MultiValueTable chs;
- private final Hashtable connectionJobs = new Hashtable();
// One attempt to open a conn at once to each Peer
+ private final Hashtable connectionJobs = new Hashtable();
+
+ // Map of Identity -> PeerHandler
+ private final Hashtable peerHandlers = new Hashtable();
private int openConns;
private Object openConnsSync = new Object();
@@ -56,8 +59,6 @@
private LinkedList closedList = new LinkedList();
private final LRUQueue lru = new LRUQueue();
- // Queue of blacklisted peers
- private final BlackLRUQueue blq = new BlackLRUQueue(60000,100,3);
private int maxConnections = -1;
private boolean logDEBUG = true;
@@ -76,14 +77,112 @@
chs = new MultiValueTable(50, 3);
openConns = 0;
}
-
+
+ public int countPeerHandlers() {
+ return peerHandlers.size();
+ }
+
+// public PeerHandler makePeerHandler(Peer p) {
+// Identity i = p.getIdentity();
+// synchronized(peerHandlers) {
+// PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
+// if(ph != null) return ph;
+// ph = new PeerHandler(i, Main.node,
+//
Main.node.getMaxPacketLength());
+// peerHandlers.put(p, ph);
+// return ph;
+// }
+// }
+
+ /**
+ * Get the NodeReference for the identity given, if it is available,
+ * from the peer handlers.
+ */
+ public NodeReference getNodeReference(Identity id) {
+ synchronized(peerHandlers) {
+ PeerHandler ph = (PeerHandler)(peerHandlers.get(id));
+ if(ph == null) return null;
+ return ph.getReference();
+ }
+ }
+
+ /**
+ * Add or update a peerhandler, called by Node.reference.
+ */
+ public PeerHandler addPeer(Identity id, NodeReference nr) {
+ synchronized(peerHandlers) {
+ PeerHandler ph = (PeerHandler)(peerHandlers.get(id));
+ if(ph != null) {
+ ph.updateReference(nr);
+ } else {
+ ph = new PeerHandler(id, nr, Main.node,
+
Main.node.getMaxPacketLength());
+ peerHandlers.put(id, ph);
+ }
+ return ph;
+ }
+ }
+
+ /**
+ * Does the connection specified need to open a new connection?
+ * If this returns false, messages will not be routed to that
+ * identity.
+ */
+ public boolean needsConnection(Identity i) {
+ synchronized(peerHandlers) {
+ PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
+ if(ph == null) return false;
+ return ph.needsConnection();
+ }
+ }
+
+ private void removePeerHandler(Peer p) {
+ synchronized(peerHandlers) {
+ peerHandlers.remove(p);
+ }
+ }
+
+ public void addPeerHandler(Identity i, PeerHandler ph,
+ boolean allowedToFail) {
+ synchronized(peerHandlers) {
+ Object oph = peerHandlers.get(i);
+ if(oph != null) {
+ if(!allowedToFail)
+ Core.logger.log(this, "Not replacing "+oph+
+ " with "+ph+"
for "+i,
+ new
Exception("debug"),
+ Logger.NORMAL);
+ } else
+ peerHandlers.put(i, ph);
+ }
+ }
+
/**
* This method is package only, meant only for ConnectionHandler
* objects to add themselves.
*/
void put(ConnectionHandler ch) {
Identity i = ch.peerIdentity();
- if(i == null) throw new IllegalArgumentException("Must have a
peerIdentity");
+ if(i == null)
+ throw new IllegalArgumentException("Must have a peerIdentity");
+ // Find Peer for ch
+ PeerHandler ph;
+ synchronized(peerHandlers) {
+ // REDFLAG: with sessionv2 we will know the Peer from
negotiation
+ ph = (PeerHandler)(peerHandlers.get(i));
+ if(ph == null) {
+ // Hrrrrrrm
+ // Hopefully this won't result in a put()!
+ NodeReference ref = ch.targetReference();
+ if(ref == null)
+ ref = Main.node.rt.getNodeReference(i);
+ ph = new PeerHandler(i, ref, Main.node,
+
Main.node.getMaxPacketLength());
+ peerHandlers.put(i, ph);
+ }
+ }
+ // ph != null now
+ ch.setPeerHandler(ph); // will register on ph
Object syncOb = chs.getSync(i);
if(syncOb == null) {
// FIXME: adequately synchronized? I _think_ so because of
chs's internal synchronization...
@@ -112,9 +211,29 @@
KillSurplusConnections();
}
+ public PeerHandler makePeerHandler(Identity i, NodeReference ref) {
+ synchronized(peerHandlers) {
+ PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
+ if(ph != null) return ph;
+ if(ref == null)
+ ref = Main.node.rt.getNodeReference(i);
+ ph = new PeerHandler(i, ref, Main.node,
+
Main.node.getMaxPacketLength());
+ peerHandlers.put(i, ph);
+ return ph;
+ }
+ }
+
+ public PeerHandler getPeerHandler(Identity id) {
+ synchronized(peerHandlers) {
+ return (PeerHandler)(peerHandlers.get(id));
+ }
+ }
+
// Removing a connection that isn't in the OCM is a
// legal NOP.
ConnectionHandler remove(ConnectionHandler ch) {
+ // CH.terminate() will remove from PeerHandler
//if (ch.peerIdentity() == null) return null;
Identity id = ch.peerIdentity();
Object syncOb = chs.getSync(id);
@@ -147,89 +266,88 @@
* This will return an open and not busy ConnectionHandler to the
* node identified.
*/
- public ConnectionHandler findFreeConnection(Identity id) {
- logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
- if (logDEBUG)
- Core.logger.log(this, "findFreeConnection("+id+")",
- new Exception("debug"), Logger.DEBUG);
- Enumeration e = chs.getAll(id); //moved this out here --zab
- ConnectionHandler confirmed = null; // if we find one that is
instantly suitable...
- HashSet candidates = new HashSet(); //put sending() &&
!reallySending() CHs here
- //so that we iterate over smaller collections
- Object syncOb = chs.getSync(id);
- if(syncOb == null) return null;
- synchronized(syncOb) {
- while (e.hasMoreElements()) {
- ConnectionHandler res = (ConnectionHandler)
e.nextElement();
- if (!res.isOpen()) {
- if(logDEBUG)
- Core.logger.log(this, "Skipped closed
connection "+res,
-
Logger.MINOR);
- // It will be terminated eventually
- // Do not remove it from OCM because it will
then be orphaned and take up a fd even though it is not available for sending.
- } else if (!res.sending()) {
- if (logDEBUG)
- Core.logger.log(this, "Found "+res,
Logger.MINOR);
- confirmed = res;
- break;
- } else {
- if(!res.reallySending()){
- candidates.add(res);
- } else if (logDEBUG) {
- Core.logger.log(this, "Skipping:
"+res+": sending",
-
Logger.MINOR);
- }
- }
- }
- }
- if(confirmed != null) {
- // found one
- lru.push(confirmed);
- // Mark this connection as cached
- // so we know to discount it in
- // connection success accounting in
- // the Routing implementation.
- confirmed.setCached(true);
- return confirmed;
- }
- /*
- * imho this should be 1, not 2. by opening new connections too often
- * you starve the trailers and other messages and end up with a
feedback loop
- * I'll test it locally with 1 and report the results.
- * --zab
- * Probably. Although it'd be nice to have a spare open for a new
trailer.
- * Also possible problems with closed connections.
- * --amphibian
- */
- if(candidates.size() < 1) {
- Main.node.scheduleConnectionOpener(id);
- // open another one
- }
- if (candidates.size() == 0) //shortcut
- return null; //we may want to notify the user we've scheduled
to open a new conn
+// public ConnectionHandler findFreeConnection(Identity id) {
+// logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
+// if (logDEBUG)
+// Core.logger.log(this, "findFreeConnection("+id+")",
+// new Exception("debug"), Logger.DEBUG);
+// Enumeration e = chs.getAll(id); //moved this out here --zab
+// ConnectionHandler confirmed = null; // if we find one that is
instantly suitable...
+// HashSet candidates = new HashSet(); //put sending() &&
!reallySending() CHs here
+// //so that we iterate over smaller collections
+// Object syncOb = chs.getSync(id);
+// if(syncOb == null) return null;
+// synchronized(syncOb) {
+// while (e.hasMoreElements()) {
+// ConnectionHandler res = (ConnectionHandler)
e.nextElement();
+// if (!res.isOpen()) {
+// if(logDEBUG)
+// Core.logger.log(this, "Skipped closed
connection "+res,
+//
Logger.MINOR);
+// // It will be terminated eventually
+// // Do not remove it from OCM because it will
then be orphaned and take up a fd even though it is not available for sending.
+// } else if (!res.sending()) {
+// if (logDEBUG)
+// Core.logger.log(this, "Found "+res,
Logger.MINOR);
+// confirmed = res;
+// break;
+// } else {
+// if(!res.reallySending()){
+// candidates.add(res);
+// } else if (logDEBUG) {
+// Core.logger.log(this, "Skipping:
"+res+": sending",
+//
Logger.MINOR);
+// }
+// }
+// }
+// }
+// if(confirmed != null) {
+// // found one
+// lru.push(confirmed);
+// // Mark this connection as cached
+// // so we know to discount it in
+// // connection success accounting in
+// // the Routing implementation.
+// confirmed.setCached(true);
+// return confirmed;
+// }
+// /*
+// * imho this should be 1, not 2. by opening new connections too often
+// * you starve the trailers and other messages and end up with a
feedback loop
+// * I'll test it locally with 1 and report the results.
+// * --zab
+// * Probably. Although it'd be nice to have a spare open for a new
trailer.
+// * Also possible problems with closed connections.
+// * --amphibian
+// */
+// if(candidates.size() < 1) {
+// Main.node.scheduleConnectionOpener(id);
+// // open another one
+// }
+// if (candidates.size() == 0) //shortcut
+// return null; //we may want to notify the user we've scheduled
to open a new conn
- ConnectionHandler best = null;
- int bestVal = Integer.MAX_VALUE;
- for (Iterator i = candidates.iterator() ; i.hasNext() ; ) {
- ConnectionHandler res = (ConnectionHandler) i.next();
- if(res.useValue() < bestVal) {
- best = res;
- bestVal = res.useValue();
- }
-
- }
- if(best != null) {
- synchronized(lru) {
- lru.push(best); // sync(lru)
- best.setCached(true);
- }
- return best;
- }
- if(logDEBUG)
- Core.logger.log(this, "Couldn't find open connection for "+
- id, Logger.DEBUG);
- return null;
- }
+// ConnectionHandler best = null;
+// int bestVal = Integer.MAX_VALUE;
+// for (Iterator i = candidates.iterator() ; i.hasNext() ; ) {
+// ConnectionHandler res = (ConnectionHandler) i.next();
+// if(res.useValue() < bestVal) {
+// best = res;
+// bestVal = res.useValue();
+// }
+// }
+// if(best != null) {
+// synchronized(lru) {
+// lru.push(best); // sync(lru)
+// best.setCached(true);
+// }
+// return best;
+// }
+// if(logDEBUG)
+// Core.logger.log(this, "Couldn't find open connection for "+
+// id, Logger.DEBUG);
+// return null;
+// }
public void markClosed(ConnectionHandler ch) {
synchronized(closedList) {
@@ -243,33 +361,33 @@
* smallest value of sendQueueSize() divided by the preference value of
* the transport.
*/
- private ConnectionHandler findBestConnection(Identity id) {
- ConnectionHandler res, best = null;
- double resval, bestval = Double.POSITIVE_INFINITY;
- Object syncOb = chs.getSync(id);
- if(syncOb == null) return null;
- synchronized(syncOb) {
- for (Enumeration e = chs.getAll(id) ; e.hasMoreElements() ; ) {
- res = (ConnectionHandler) e.nextElement();
- if (!res.isOpen()) {
- if(logDEBUG)
- Core.logger.log(this, "Skipped closed
connection "+res,
-
Logger.DEBUG);
- // It will be terminated eventually
- // Do not remove it from OCM because it will
then be orphaned and take up a fd even though it is not available for sending.
- } else {
- resval = res.sendQueueSize() /
- res.transport().preference();
+// private ConnectionHandler findBestConnection(Identity id) {
+// ConnectionHandler res, best = null;
+// double resval, bestval = Double.POSITIVE_INFINITY;
+// Object syncOb = chs.getSync(id);
+// if(syncOb == null) return null;
+// synchronized(syncOb) {
+// for (Enumeration e = chs.getAll(id) ; e.hasMoreElements() ; ) {
+// res = (ConnectionHandler) e.nextElement();
+// if (!res.isOpen()) {
+// if(logDEBUG)
+// Core.logger.log(this, "Skipped closed
connection "+res,
+//
Logger.DEBUG);
+// // It will be terminated eventually
+// // Do not remove it from OCM because it will
then be orphaned and take up a fd even though it is not available for sending.
+// } else {
+// resval = res.sendQueueSize() /
+// res.transport().preference();
- if (resval < bestval) {
- bestval = resval;
- best = res;
- }
- }
- }
- return best;
- }
- }
+// if (resval < bestval) {
+// bestval = resval;
+// best = res;
+// }
+// }
+// }
+// return best;
+// }
+// }
public int countConnections(Identity id) {
Object syncOb = chs.getSync(id);
@@ -326,13 +444,6 @@
}
}
- public void blackList(Peer p){
- Core.logger.log(this, "blackListing " + p, Logger.DEBUG);
- if ( p != null && p.getIdentity() != null && p.getAddress() != null ) {
- blq.push(p);
- }
- }
-
/**
* Creates a new Connection which is started and added.
* @param c The Core to connect from
@@ -355,21 +466,6 @@
boolean updatedRefcount = false;
boolean weStarted = false;
- blq.clean();
- Core.logger.log(this, "Current blackListQueue size: " + blq.size() +
- ", Checking " + p,
- Logger.DEBUG);
- if ( blq.isBlackListed(p) ) {
- Core.logger.log(this, "Attempted to open connection for blackListed " +
- p,Logger.DEBUG);
- ConnectFailedException e =
- new ConnectFailedException(p.getAddress(),
- p.getIdentity(),
- "BlackListed",
- true);
- Core.logger.log(this, "Failed to connect: " + e, Logger.DEBUG);
- throw e;
- }
try {
synchronized(connectionJobs) {
if ( ( ct = (ConnectionJob)connectionJobs.get(p) ) != null ) {
@@ -383,7 +479,7 @@
ct.incRefcount();
}
}
-
+
if(ct == null) {
weStarted = true;
ct = new ConnectionJob(c, p);
@@ -472,6 +568,25 @@
}
}
+ public TrailerWriter sendMessage(Message m, Identity i, NodeReference ref,
+ long timeout,
int msgPrio)
+ throws SendFailedException {
+ return makePeerHandler(i, ref).sendMessage(m, timeout, msgPrio);
+ }
+
+ public void sendMessageAsync(Message m, Identity i, NodeReference ref,
+ MessageSendCallback
cb, int msgPrio)
+ throws SendFailedException {
+ makePeerHandler(i, ref).sendMessageAsync(m, cb, msgPrio);
+ }
+
+ public void unsendMessage(Identity i, MessageSendCallback cb) {
+ PeerHandler ph = getPeerHandler(i);
+ if(ph != null) {
+ ph.unsendMessage(cb);
+ }
+ }
+
/**
* Kills off one or more connections to ensure that we stay below the
connection limit
* This method is appropriate to call after a connection has been added to the
OCM
@@ -546,26 +661,27 @@
}
}
}
+
/**
* Returns a free connection, making a new one if none is available.
*/
- public ConnectionHandler getConnection(Core c, Peer p, long timeout)
- throws CommunicationException {
+// public ConnectionHandler getConnection(Core c, Peer p, long timeout)
+// throws CommunicationException {
- // Let the race begin!
- // If non-null it was open and free when findFreeConnection returned
- // but nothing guarantees that some other thread won't have sent a
- // message with a huge trailing field by the time you actually
- // try to send a message on it.
- Core.logger.log(this, "getConnection("+p+","+timeout+")",
- Logger.DEBUG);
- ConnectionHandler ch = findFreeConnection(p.getIdentity());
+// // Let the race begin!
+// // If non-null it was open and free when findFreeConnection returned
+// // but nothing guarantees that some other thread won't have sent a
+// // message with a huge trailing field by the time you actually
+// // try to send a message on it.
+// Core.logger.log(this, "getConnection("+p+","+timeout+")",
+// Logger.DEBUG);
+// ConnectionHandler ch = findFreeConnection(p.getIdentity());
- if (ch == null) {
- ch = createConnection(c, p, timeout);
- }
- return ch;
- }
+// if (ch == null) {
+// ch = createConnection(c, p, timeout);
+// }
+// return ch;
+// }
/**
* Gives the number of registered open connections.
@@ -714,11 +830,11 @@
String imageURL = "/servlet/images/aqua/arrow";
if(ch.outbound) imageURL += "_outbound";
else imageURL += "_inbound";
- if(ch.receiving() && ch.reallySending())
+ if(ch.receiving() && ch.sending()) //Can this happen
yet?
imageURL += "_both";
else
if(ch.receiving()) imageURL += "_receiving";
- else if(ch.reallySending()) imageURL +=
"_transmitting";
+ else if(ch.sending()) imageURL +=
"_transmitting";
else imageURL += "_sleeping";
imageURL += ".png";
buffer.append("<center><img src='" + imageURL + "'
height = '15' width = '24'>" +
@@ -777,7 +893,7 @@
//int x = ch.sendingCount();
//if(x > 0) sending++;;
//buffer.append(ch.sendingCount() + sep); Remove until there
can actually be other values than 0 and 1 here
- if(ch.reallySending())
+ if(ch.sending())
sending++;
if(ch.receiving())
@@ -791,7 +907,7 @@
buffer.append(sepAlignRight+ch.messages());
if(useOldStyle || viewLevel > 0){
- if(ch.reallySending())
+ if(ch.sending())
buffer.append
(sepAlignRight +
(viewLevel > 1
@@ -826,7 +942,7 @@
).replaceAll(" "," ")
);
}else{
- if(ch.reallySending()||ch.receiving())
+ if(ch.sending()||ch.receiving())
buffer.append(sepAlignRight+format(ch.sendQueueSize()));
else
buffer.append(sepAlignRight+"-");
@@ -1281,13 +1397,14 @@
if(logDEBUG) Core.logger.log(this, "configged
WSL for "+
ch+" ("+this+")", Logger.DEBUG);
- if (!core.hasInterfaceFor(ch.transport())) {
- // if we don't have an interface for this transport, we
- // will ask this connection to persist.
- Message m = ch.presentationType().getSustainMessage();
- if (m != null)
- ch.sendMessage(m);
- }
+ // IIRC we don't use sustain anymore: FIXME --
amphibian
+// if (!core.hasInterfaceFor(ch.transport())) {
+// // if we don't have an interface for this transport, we
+// // will ask this connection to persist.
+// Message m = ch.presentationType().getSustainMessage();
+// if (m != null)
+// ch.sendMessage(m);
+// }
long now = System.currentTimeMillis();
long connectingTime = now - start;
Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PeerHandler.java 8 Sep 2003 17:03:03 -0000 1.1
+++ PeerHandler.java 4 Oct 2003 01:16:56 -0000 1.2
@@ -1,180 +1,382 @@
+/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet;
import java.util.*;
import java.io.OutputStream;
import freenet.Core;
import freenet.node.Node;
+import freenet.node.NodeReference;
import freenet.session.Link;
import freenet.support.Logger;
-class PeerHandler {
- Peer peer;
+/**
+ * PeerHandler
+ *
+ * Class managing and representing our communications with a remote peer.
+ * Contains message queue, connection and backoff logic, etc.
+ * Despite the name, a PeerHandler is unique by its Identity, and it keeps
+ * a NodeReference for purposes of opening connections.
+ */
+public class PeerHandler {
+ final Identity id; // the node's identity - never changes
- LinkedList messages;
+ NodeReference ref; // the current reference, can be null, meaning we can only
talk back over open conns
- LinkedList connectionHandlers;
+ final LinkedList messages;
- Node node;
+ final LinkedList connectionHandlers;
+
+ final Node node;
int maxPacketSize;
-
- public PeerHandler(Peer p, Node n, int maxPacketSize) {
- this.peer = p;
- this.node = n;
- this.maxPacketSize = maxPacketSize;
+
+ // Packet priorities
+ public static final int EXPENDABLE = 0;
+ public static final int NORMAL = 1;
+
+ public String toString() {
+ return super.toString() + " ("+id+")";
+ }
+
+ /**
+ * Register a ConnectionHandler to us. This should be called as soon as
+ * it has completed negotiations, not waiting for the Identify.
+ */
+ public void registerConnectionHandler(ConnectionHandler ch) {
+ Core.logger.log(this, "Registering "+ch+" on "+this,
+ Logger.MINOR);
+ synchronized(connectionHandlers) {
+ connectionHandlers.add(ch);
+ }
+ }
+
+ /**
+ * Unregister a ConnectionHandler. This should be called after we
+ * have lost the ability to send and receive messages on the CH.
+ */
+ public void unregisterConnectionHandler(ConnectionHandler ch) {
+ Core.logger.log(this, "Unregistering "+ch+" on "+this,
+ Logger.MINOR);
+ synchronized(connectionHandlers) {
+ connectionHandlers.remove(ch);
+ }
+ if(connectionHandlers.size() == 0 &&
+ ref == null && !messages.isEmpty()) {
+ Core.logger.log(this, "Lost all connections for "+id+
+ ", but "+messages.size()+"
messages left",
+ Logger.NORMAL);
+ }
+ }
+
+ /**
+ * Set the NodeReference. If the new NodeReference supersedes the old one,
+ * it will be set. This is the same logic used in the routing table.
+ * Should be called amonst other occasions by ConnectionHandler when we
+ * receive an Identify message.
+ * @return the current NodeReference
+ */
+ public NodeReference updateReference(NodeReference nr) {
+ if(nr.supersedes(ref)) ref = nr;
+ return ref;
+ }
+
+ public NodeReference getReference() {
+ return ref;
+ }
+
+ /**
+ * Do we need to open a new connection?
+ * If this returns false, messages will not be routed to that
+ * identity (see *Routing).
+ */
+ public boolean needsConnection() {
+ if(messages.isEmpty() &&
+ id == null || (!(node.rt.references(id)))) return false;
+ // FIXME: layering
+
+ synchronized(connectionHandlers) {
+ for(Iterator e = connectionHandlers.listIterator(0);
+ e.hasNext();) {
+ ConnectionHandler ch = (ConnectionHandler)(e.next());
+ if(!ch.isOpen()) continue;
+ if(ch.isSendingPacket()) continue;
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void unsendMessage(PeerPacketMessage pm) {
+ // FIXME: speed up - hashtable?
+ synchronized(messages) {
+ for(Iterator i = messages.listIterator(0); i.hasNext();) {
+ PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
+ if(cmp == pm) {
+ i.remove();
+ return;
+ }
+ }
+ }
+ }
+
+ public void unsendMessage(MessageSendCallback cb) {
+ // FIXME: speed up - hashtable?
+ synchronized(messages) {
+ for(Iterator i = messages.listIterator(0); i.hasNext();) {
+ PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
+ MessageSendCallback cmpCB = cmp.cb;
+ if(cb == cmpCB) {
+ i.remove();
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a PeerHandler
+ * @param id the identity of the peer. Null for an FCP connection.
+ * @param ref the initial NodeReference of the peer - can be null, meaning we
+ * don't know yet, but we can reply over currently open conns.
+ * @param n the Node.
+ * @param maxPacketSize the maximum length allowable for sending packets.
+ */
+ public PeerHandler(Identity id, NodeReference ref, Node n, int maxPacketSize) {
+ this.id = id;
+ this.ref = ref;
+ this.node = n;
+ this.maxPacketSize = maxPacketSize;
+ messages = new LinkedList();
+ connectionHandlers = new LinkedList();
}
/**
* Start an asynchronous message send
* Call the callback provided when it is finished
*/
- synchronized void sendMessageAsync(RawMessage r, MessageSendCallback cb)
- throws SendFailedException {
- PeerPacketMessage pm = new PeerPacketMessage(peer, r, cb);
- throw new UnsupportedOperationException("PeerHandler.sendMessageAsync");
- /* Possibilities:
- *
- * 1. No ConnectionHandlers (without trailing fields) open.
- * Strategy: open one, then send message
- * 2. Idle ConnectionHandlers
- * Strategy: pick one, make it send a single message packet
- * 3. No Idle ConnectionHandlers
- * Strategy: wait for one to ask us for a packet
- */
-// int handlersSendingPackets = 0;
-// for(Iterator e = connectionHandlers.listIterator(0);
-// e.hasNext();) {
-// ConnectionHandler ch =
-// (ConnectionHandler)(e.next());
-// if(!ch.isOpen()) {
-// // Can't send messages
-// Core.logger.log(this, ch.toString()+" can't send messages",
-// Logger.DEBUG);
-// e.remove();
-// continue;
-// }
-// // if(ch.isSendingPacket()) {
-// // handlersSendingPackets++;
-// // }
-// // Not busy!
-// sendSinglePacket(ch, pm);
-// return;
-// // No suitable ConnectionHandlers found!
-// // Queue the message
-// messages.addLast(pm);
-// if(handlersSendingPackets > 0) {
-// // One of them will call us
-// } else {
-// // Uh oh...
-// node.scheduleConnectionOpener(peer.getIdentity());
-// // When it registers with OCM, it will getPacket()
-// }
-// }
- }
+ public synchronized void sendMessageAsync(Message r, MessageSendCallback cb,
+
int msgPrio)
+ throws SendFailedException {
+ PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+ innerSendMessageAsync(pm);
+ }
+
+ protected synchronized void innerSendMessageAsync(PeerPacketMessage pm) {
+ /* Possibilities:
+ *
+ * 1. No ConnectionHandlers (without trailing fields) open.
+ * Strategy: open one, then send message
+ * 2. Idle ConnectionHandlers
+ * Strategy: pick one, make it send a single message packet
+ * 3. No Idle ConnectionHandlers
+ * Strategy: wait for one to ask us for a packet
+ */
+ Core.logger.log(this, "Sending "+pm+" on "+this,
+ Logger.DEBUG);
+ int handlersSendingPackets = 0;
+ synchronized(connectionHandlers) {
+ for(Iterator e = connectionHandlers.listIterator(0);
+ e.hasNext();) {
+ ConnectionHandler ch =
+ (ConnectionHandler)(e.next());
+ if(!ch.isOpen()) {
+ // Can't send messages
+ Core.logger.log(this, ch.toString()+
+ " can't send
messages for "+pm+" on "+
+ this,
Logger.DEBUG);
+ e.remove();
+ continue;
+ }
+ if(ch.isSendingPacket()) {
+ Core.logger.log(this, ch.toString()+
+ " already
sending a packet for "+pm+
+ " on "+this,
Logger.DEBUG);
+ handlersSendingPackets++;
+ continue;
+ }
+ // Not busy!
+ Core.logger.log(this, ch.toString()+
+ ": send one packet
"+pm+" for "+this,
+ Logger.DEBUG);
+ sendSinglePacket(ch, pm);
+ return;
+ }
+ }
+ // No suitable ConnectionHandlers found!
+ // Queue the message
+ Core.logger.log(this, "Queueing "+pm+" on "+this,
+ Logger.DEBUG);
+ if(id == null && handlersSendingPackets == 0) {
+ // Can't open a connection
+ pm.notifyFailure(null);
+ Core.logger.log(this, "Failed to send packet, no more conns: "+
+ pm+" on "+this, Logger.DEBUG);
+ return;
+ }
+ synchronized(messages) {
+ messages.addLast(pm);
+ }
+ if(handlersSendingPackets > 0) {
+ // One of them will call us
+ } else {
+ // Uh oh...
+ if(id != null)
+ node.scheduleConnectionOpener(id);
+ // When it registers with OCM, it will getPacket()
+ }
+ Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
+
messages.size());
+ }
/**
- * Send one packet on one ConnectionHandler
+ * Send one message on one ConnectionHandler.
+ * @param ch the connection on which to send the message
+ * @param pm the single message to send
*/
protected void sendSinglePacket(ConnectionHandler ch,
- PeerPacketMessage pm) {
- PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
- PeerPacket packet = new PeerPacket(msgs, ch.getLink());
- //ch.sendPacket(packet);
- throw new UnsupportedOperationException("sendSinglePacket");
+
PeerPacketMessage pm) {
+ PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
+ PeerPacket packet = new PeerPacket(msgs, ch.getLink(),
+
ch.presentationType());
+ ch.sendPacket(packet, freenet.transport.WriteSelectorLoop.MESSAGE);
}
+ public final PeerPacket getPacket(Link link, Presentation p) {
+ return getPacket(link, p, null, null, false);
+ }
+
/**
* Get a packet to send. Called by a ConnectionHandler.
* It will then send it. synchronized so as to avoid isSendingPacket()
* race with sendMessageAsync. Returns null if nothing to send.
+ * @param m link specific message to prepend to the packet, or null
+ * if not needed. Used for such hacks as the Identify message.
+ * @param i identity for the message - not needed unless we m != null.
+ * @param onlyGivenMsg if true, only include the given message, used for
+ * starting a close dialog.
*/
- public synchronized PeerPacket getPacket(Link link) {
- /** THE RULES
- *
- * Collect as many messages as will fit in one packet.
- * If a message is bigger than the maximum, add it on to the packet
- * and send the oversized packet anyway.
- * Stop if a message includes a trailing field.
- */
- int packetLength = 0;
- LinkedList packetMessages = new LinkedList();
- while(!messages.isEmpty()) {
- PeerPacketMessage msg =
- (PeerPacketMessage)(messages.removeFirst());
- if((msg.getLength() > maxPacketSize) ||
- msg.hasTrailer()) {
- packetMessages.addLast(msg);
- packetLength += msg.getLength();
- break;
- }
- if(msg.getLength() + packetLength >
- maxPacketSize && (!packetMessages.isEmpty())) {
- messages.addFirst(msg);
- break;
- }
- packetLength += msg.getLength();
- }
- if(packetMessages.isEmpty()) return null;
- else {
- PeerPacketMessage[] msgs =
- new PeerPacketMessage[packetMessages.size()];
- packetMessages.toArray(msgs);
- return new PeerPacket(msgs, link);
- }
+ public synchronized PeerPacket getPacket(Link link, Presentation p,
+
Identity i, Message m,
+
boolean onlyGivenMsg) {
+ Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
+
messages.size());
+ /** THE RULES
+ *
+ * Collect as many messages as will fit in one packet.
+ * If a message is bigger than the maximum, add it on to the packet
+ * and send the oversized packet anyway.
+ * Stop if a message includes a trailing field.
+ */
+ int packetLength = 0;
+ LinkedList packetMessages = new LinkedList();
+ if(m != null) {
+ packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL));
+ }
+ if(!onlyGivenMsg) {
+ synchronized(messages) {
+ while(!messages.isEmpty()) {
+ PeerPacketMessage msg =
+
(PeerPacketMessage)(messages.removeFirst());
+ msg.resolve(p); // we need the length
+ if((msg.getLength() > maxPacketSize) ||
+ msg.hasTrailer()) {
+ packetMessages.addLast(msg);
+ packetLength += msg.getLength();
+ break;
+ }
+ if(msg.getLength() + packetLength >
+ maxPacketSize &&
(!packetMessages.isEmpty())) {
+ messages.addFirst(msg);
+ break;
+ }
+ packetLength += msg.getLength();
+ }
+ }
+ }
+ if(packetMessages.isEmpty()) return null;
+ else {
+ PeerPacketMessage[] msgs =
+ new PeerPacketMessage[packetMessages.size()];
+ packetMessages.toArray(msgs);
+ return new PeerPacket(msgs, link, p);
+ }
}
+ public TrailerWriter sendMessage(Message m) throws SendFailedException {
+ return sendMessage(m, 0, NORMAL);
+ }
+
/**
* Send a message synchronously
* @return a stream to write the trailing message to, or null
* if the message does not have a trailing field.
*/
- OutputStream sendMessage(RawMessage r) throws SendFailedException {
- MyMessageSendCallback cb =
- new MyMessageSendCallback();
- sendMessageAsync(r, cb);
- while(!cb.finished) {
- try {
- cb.wait();
- } catch (InterruptedException e) {}
- }
- if(cb.e != null) {
- if(cb.e instanceof SendFailedException)
- throw (SendFailedException)(cb.e);
- else {
- Core.logger.log(this, "Got unexpected exception: "+
- cb.e+" sending "+r, Logger.ERROR);
- SendFailedException e =
- new SendFailedException(peer.getAddress(),
- peer.getIdentity(),
- "Unexpected exception "+
- cb.e, false);
- e.initCause(cb.e);
- throw e;
- }
- } else
- return cb.trailerStream;
+ public TrailerWriter sendMessage(Message r, long timeout,
+ int msgPrio)
throws SendFailedException {
+ MyMessageSendCallback cb =
+ new MyMessageSendCallback();
+ PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+ innerSendMessageAsync(pm);
+ long timeoutAt = System.currentTimeMillis() + timeout;
+ synchronized(cb) {
+ while(!cb.finished) {
+ try {
+ long waitTime;
+ if(timeout > 0) {
+ waitTime = timeoutAt -
System.currentTimeMillis();
+ if(waitTime < 0) break;
+ } else waitTime = 0;
+ cb.wait(waitTime);
+ } catch (InterruptedException e) {}
+ }
+ }
+ if(!cb.finished) {
+ // Couldn't send message
+ unsendMessage(pm);
+ throw new SendFailedException(null, // FIXME?
+ id,
"Timed out sending message",
+
false);
+ }
+ if(cb.e != null) {
+ if(cb.e instanceof SendFailedException)
+ throw (SendFailedException)(cb.e);
+ else {
+ Core.logger.log(this, "Got unexpected exception: "+
+ cb.e+" sending "+r,
Logger.ERROR);
+ SendFailedException e =
+ new SendFailedException(null, // FIXME?
+
id, "Unexpected exception "+
+
cb.e, false);
+ e.initCause(cb.e);
+ throw e;
+ }
+ } else
+ return cb.trailer;
}
+ /**
+ * Helper class used to emulate blocking sends.
+ */
class MyMessageSendCallback implements MessageSendCallback {
- boolean finished = false;
- Exception e = null;
- OutputStream trailerStream = null;
- public void succeeded() {
- finished = true;
- synchronized(this) {
- this.notify();
- }
- }
-
- public void thrown(Exception e) {
- this.e = e;
- finished = true;
- synchronized(this) {
- this.notify();
- }
- }
+ boolean finished = false;
+ Exception e = null;
+ TrailerWriter trailer = null;
+ public void succeeded() {
+ finished = true;
+ synchronized(this) {
+ this.notify();
+ }
+ }
+
+ public void thrown(Exception e) {
+ this.e = e;
+ finished = true;
+ synchronized(this) {
+ this.notify();
+ }
+ }
- public void setTrailerStream(OutputStream os) {
- trailerStream = os;
- }
+ public void setTrailerWriter(TrailerWriter tw) {
+ trailer = tw;
+ }
}
}
Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PeerPacket.java 18 Sep 2003 17:48:04 -0000 1.2
+++ PeerPacket.java 4 Oct 2003 01:16:56 -0000 1.3
@@ -3,17 +3,19 @@
import java.io.OutputStream;
class PeerPacket {
+ Presentation p;
PeerPacketMessage[] messages;
byte[] encryptedData; // plaintext. Encrypt at send time.
int sentBytes;
- PeerPacket(PeerPacketMessage[] msgs, Link l)
+ PeerPacket(PeerPacketMessage[] msgs, Link l, Presentation p)
throws IllegalArgumentException {
sentBytes = 0;
messages = msgs;
int totalLength = 0;
for(int i=0;i<msgs.length;i++) {
PeerPacketMessage m = msgs[i];
+ m.resolve(p);
totalLength += m.getLength();
if((i != (msgs.length-1)) && m.hasTrailer())
throw new IllegalArgumentException("trailers can only be attached to
the LAST message!");
@@ -39,6 +41,10 @@
return encryptedData;
}
+ public int countMessages() {
+ return messages.length;
+ }
+
/**
* Notify clients of message send completion
* @param finished whether the packet has finished sending. If false,
@@ -47,7 +53,7 @@
* Can be zero. Bytes after this number might have been sent, but we got
* an error and can't be sure.
*/
- public void jobDone(boolean finished, int successfullySentBytes) {
+ public void jobDone(boolean finished, int successfullySentBytes, Peer sentTo) {
int msgStartOffset = 0;
// Could unfold this a bit by keeping the offset in the array on the object
PeerPacketMessage prev = null;
@@ -71,19 +77,44 @@
// Last message was not dealt with last time
// prev != null because msgStartOffset > 0
int prevLen = prev.getLength();
- int diff = msgStartOffset - sentBytes;
- if(finished ||
- ((successfullySentBytes - msgStartOffset) > prevLen))
- prev.notifyDone((successfullySentBytes - msgStartOffset) >
- prevLen);
+ int diff = successfullySentBytes - msgStartOffset;
+ if(finished || diff >= prevLen) {
+ if(diff >= prevLen) {
+ prev.notifySuccess();
+ } else {
+ String excuse = "Sent "+
+ (diff>=0 ? diff : 0) +
+ " bytes ("+(getLength() - successfullySentBytes)+
+ " of packet in notifyDone";
+ SendFailedException sfe =
+ new SendFailedException(sentTo.getAddress(),
+ sentTo.getIdentity(),
+ excuse, false);
+ prev.notifyFailure(sfe);
+ }
+ }
}
prev = messages[i];
}
if(prev != null) {
int prevLen = prev.getLength();
boolean success = (successfullySentBytes == encryptedData.length);
- if(finished || success)
- prev.notifyDone(success);
+ if(finished || success) {
+ if(success)
+ prev.notifySuccess();
+ else {
+ int diff = successfullySentBytes - msgStartOffset;
+ String excuse = "Sent "+
+ (diff>=0 ? diff : 0) +
+ " bytes ("+(getLength() - successfullySentBytes)+
+ " of packet in notifyDone";
+ SendFailedException sfe =
+ new SendFailedException(sentTo.getAddress(),
+ sentTo.getIdentity(),
+ excuse, false);
+ prev.notifyFailure(sfe);
+ }
+ }
}
}
}
Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- PeerPacketMessage.java 8 Sep 2003 17:03:03 -0000 1.1
+++ PeerPacketMessage.java 4 Oct 2003 01:16:56 -0000 1.2
@@ -4,21 +4,56 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+/**
+ * PeerPacketMessage - a message to be sent to a particular peer node.
+ * Unencrypted, not yet attached to any particular connection.
+ * Used in PeerHandler's message queue, and as a part of a PeerPacket.
+ */
class PeerPacketMessage {
- final RawMessage raw;
- final byte[] content; // unencrypted - encrypt at PeerPacket level
+ final Message msg;
final MessageSendCallback cb;
- final Peer peer;
+ final Identity id;
static ByteArrayOutputStream bais = new ByteArrayOutputStream();
+ boolean finished = false;
+
+ final int priority;
+
+ // These are created when we resolve(Presentation)
+ Presentation p = null;
+ RawMessage raw = null;
+ byte[] content = null; // unencrypted - encrypt at PeerPacket level
+
public String toString() {
- return super.toString() + ":" + raw.toString() + ":" + cb;
+ return super.toString() + ":" + raw + ":" + cb;
}
- public PeerPacketMessage(Peer p, RawMessage raw, MessageSendCallback cb) {
- this.peer = p;
- this.raw = raw;
+ public PeerPacketMessage(Identity i, Message msg, MessageSendCallback cb,
+ int priority) {
+ this.id = i;
+ this.msg = msg;
this.cb = cb;
+ this.priority = priority;
+ }
+
+ /** Set the message up to send on a connection using a specific
+ * Presentation.
+ * @param p the presentation to use to transform the message into
+ * a RawMessage and thence to a byte array. Can be null to clear the
+ * cached message.
+ */
+ // Can be called multiple times
+ public void resolve(Presentation p) {
+ if(this.p == p)
+ return;
+ this.p = p;
+ if(p == null) {
+ this.p = null;
+ this.raw = null;
+ this.content = null;
+ return;
+ }
+ this.raw = msg.toRawMessage(p);
try {
synchronized(bais) {
bais.reset();
@@ -55,21 +90,43 @@
}
/**
- * Notify the callback, after having sent (or failed to send) the message
+ * Notify the callback that we successfully sent the message.
*/
- public void notifyDone(boolean success) {
+ public void notifySuccess() {
+ if(finished) {
+ Core.logger.log(this, "notifySuccess on "+this+" already finished!",
+ Logger.ERROR);
+ return;
+ }
+ finished = true;
if(cb == null) return;
try {
- if(success) {
- cb.succeeded();
- } else {
- SendFailedException e =
- new SendFailedException(peer.getAddress(), false);
- cb.thrown(e);
- }
+ cb.succeeded();
} catch (Throwable t) {
- Core.logger.log(this, toString()+".notifyDone() caught "+t,
+ Core.logger.log(this, toString()+".notifySuccess() caught "+t,
t, Logger.ERROR);
+ }
+ }
+
+ /**
+ * Notify the callback that we failed to send the message.
+ * @param detail the excuse exception
+ */
+ public void notifyFailure(SendFailedException detail) {
+ if(finished) {
+ Core.logger.log(this, "notifyFailure on "+this+" already finished!",
+ Logger.ERROR);
+ return;
+ }
+ finished = true;
+ if(cb == null) return;
+ try {
+ cb.thrown(detail);
+ } catch (Throwable t) {
+ Core.logger.log(this, toString()+".notifyFailure("+detail+") caught "+
+ t, t, Logger.ERROR);
+ Core.logger.log(this, toString()+".notifyFailure: detail was "+detail,
+ detail, Logger.ERROR);
}
}
}
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.409
retrieving revision 1.410
diff -u -r1.409 -r1.410
--- Version.java 3 Oct 2003 08:35:19 -0000 1.409
+++ Version.java 4 Oct 2003 01:16:56 -0000 1.410
@@ -18,7 +18,7 @@
public static String protocolVersion = "1.46";
/** The build number of the current revision */
- public static final int buildNumber = 6216;
+ public static final int buildNumber = 6217;
// 6028: may 3; ARK retrieval fix
public static final int ignoreBuildsAfter = 6500;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs