Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv16753/src/freenet
Modified Files:
ConnectionHandler.java Message.java OpenConnectionManager.java
PeerHandler.java PeerPacket.java PeerPacketMessage.java
Version.java
Log Message:
6224:
More debugging of PeerHandler and supporting microarchitectural changes.
- Splitfile fetch between 2 test nodes works again, rather than failing with RNFs.
- Fix totalDataSent i.e. fix OCM's report of total data sent on a conn
- Encrypt data at send time. Not in advance in packet.
- Locking changes.
- Add hasTrailer() to Message. Implement it on all Messages.
- Make a distinction between needsConnection as in can we route to it, or do we want a
new connection if possible.
- Keep separate queues for messages with and without trailers in PeerHandler.
- Messages without trailers take priority.
- needConnection(true) returns false if we have no messages queued and no trailers
sending and we are not in the RT.
- Remove closed conns from the PeerHandler when we find them. They will still be in
the OCM.
- SendData: Count size of padding sent the same way we count the non-padding - don't
add it on and therefore don't finish until after we have sent it.
Add a TODO about the data: protocol
- Cancel outwardSender callback in Pending after we get Accepted. We may have a bug
causing notifications to be lost.
- Fix possible NPE in FnpLink.encryptBytes.
Logging
Also some fixes and optimizations since 6223 by other people.
Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.168
retrieving revision 1.169
diff -u -r1.168 -r1.169
--- ConnectionHandler.java 7 Oct 2003 20:03:39 -0000 1.168
+++ ConnectionHandler.java 8 Oct 2003 22:54:18 -0000 1.169
@@ -710,10 +710,13 @@
if(processLen == -1)
processLen = 0;
if(logDEBUG) {
try {
-
logDEBUG("Didn't get message for "+this+" from "+
-
decryptLen+" bytes: \n"+
-
new String(_accumulator, 0, decryptLen,
-
"ISO-8859-1"));
+
Core.logger.log(this, "Didn't get message for "+this+" from "+
+
decryptLen+" bytes: \n"+
+
new String(_accumulator, 0, decryptLen,
+
"ISO-8859-1"),
+
decryptLen > 2048 ?
+
Logger.ERROR :
+
Logger.DEBUG);
} catch
(UnsupportedEncodingException e) {
Core.logger.log(this, "Unsupported Encoding ISO-8859-1!",
e, Logger.ERROR);
@@ -1028,7 +1031,8 @@
//we should check the status if terminate() is called from elsewhere
if (!status) {
if(logDEBUG)
- logDEBUG("jobDone failed");
+ Core.logger.log(this, "jobDone failed for "+this,
+ new
Exception("debug"), Logger.DEBUG);
//tell everybody they failed
//this is where the PeerHandler will really help
// Locking!
@@ -1043,6 +1047,9 @@
return;
}
}
+ synchronized(sendLock) {
+ totalDataSent += size;
+ }
if (sendingTrailerChunk) {
if(logDEBUG)
logDEBUG("jobDone sending chunk");
@@ -1120,9 +1127,9 @@
}
boolean needTerminate = false; // don't terminate while holding locks!
if(sendClosed.state() && !sendingCloseMessage) return;
+ if(trailerSendID != -1) return;
if(logDEBUG)
logDEBUG("Trying to send a packet...");
- if(trailerSendID != -1) return;
// This is nasty...
// Only way to START a trailer send is through this function
// sentPacket -> null
@@ -1260,11 +1267,8 @@
throw new IllegalArgumentException();
sendingTrailerChunkBytes = length;
sendingTrailerChunk = true;
- // Encrypt the data
Link l = link;
- if(l != null)
- l.encryptBytes(block, offset, length);
- else {
+ if(l == null) {
IOException e =
new IOException("Connection closed in trailer
send!");
Core.logger.log(this, "Oops: "+e+" ("+this+")", e,
@@ -1924,6 +1928,12 @@
public void flushOut() {
}
+ protected Object sendBytesLock = new Object(); // force serialization to keep
cipher consistent - probably unnecessary... FIXME
+
+ /**
+ * Send some bytes
+ * Note that the toSend will be encrypted!
+ */
private void sendBytes(byte[] toSend, int off, int len, int priority) throws
IOException {
Core.logger.log(this, "Sending "+len+" bytes on "+this,
new Exception("debug"), Logger.DEBUG);
@@ -1938,8 +1948,11 @@
if(wsl == null)
throw new IllegalStateException("wsl null in "+
ConnectionHandler.this);
- if(!wsl.send(toSend, off, len, chan, ConnectionHandler.this,priority))
{
- throw new IOException("Can't write");
+ synchronized(sendBytesLock) {
+ link.encryptBytes(toSend, off, len);
+ if(!wsl.send(toSend, off, len, chan,
ConnectionHandler.this,priority)) {
+ throw new IOException("Can't write");
+ }
}
Core.logger.log(this, "Started send of "+len+" bytes on "+this,
new Exception("debug"), Logger.DEBUG);
@@ -2096,9 +2109,6 @@
}
if(logDEBUG) logDEBUG("MessageSend finished sending
message "+this);
done = true;
- synchronized(sendLock) {
- decSendQueue(toSend.length);
- }
} finally {
// only if there is a trailing field to write do we not
// decrement sendingCount and notify() immediately
@@ -2161,8 +2171,10 @@
return false;
}
if(trailerSendID != -1) {
- throw new
IllegalStateException("sendPacket("+packet+","+prio+
-
" called but sending a trailer!");
+ if(logDEBUG)
+ logDEBUG("sendPacket("+packet+","+prio+
+ " called but sending a
trailer!");
+ return false;
}
this.sentPacket = packet;
}
@@ -2170,6 +2182,8 @@
return true;
}
+ protected Object innerSendPacketLock = new Object();
+
/**
* Send a packet
* Do NOT call while synchronized on sentPacketLock - hence the argument
@@ -2177,6 +2191,7 @@
*/
protected void innerSendPacket(int prio, PeerPacket sentPacket) {
lastActiveTime = System.currentTimeMillis();
+ // Don't send 2 at once
byte[] toSend = sentPacket.getBytes();
try {
sendBytes(toSend, 0, toSend.length, prio);
@@ -2542,6 +2557,8 @@
* specified by the Presentation object if one can be created.
*/
public void close() {
+ if(logDEBUG)
+ logDEBUG("close() called");
if(sendingCloseMessage || sendClosed.state())
return;
sendingCloseMessage = true;
@@ -2561,6 +2578,8 @@
}
}
innerSendPacket(wsl.MESSAGE, sentPacket);
+ if(logDEBUG)
+ logDEBUG("Sent close packet");
}
/** Closes the connection utterly and finally. */
@@ -2737,23 +2756,6 @@
return receiveQueueSize;
}
- /**
- * Decreases the sendQueue with the specified amount
- */
- private void decSendQueue(long amount){
- //sendQueueSize -= Math.min(amount,sendQueueSize);
- // i am ever so slightly worried this syntax is bad with volatiles
- sendQueueSize = Math.max(sendQueueSize - amount, 0);
- totalDataSent += amount;
- }
-
- /**
- * Increases the sendQueue with the specified amount
- */
- private void incSendQueue(long amount){
- sendQueueSize += amount;
- }
-
/**
* Increases the receiveQueue with the specified amount
*/
@@ -3173,7 +3175,8 @@
// }
public String toString() {
- return super.toString()+" for "+conn+","+link+", sending "+sentPacket;
+ return super.toString()+" for "+conn+","+link+", sending
"+sentPacket+":"+
+ trailerSendID;
}
}
Index: Message.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Message.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- Message.java 15 Aug 2003 00:12:05 -0000 1.6
+++ Message.java 8 Oct 2003 22:54:18 -0000 1.7
@@ -143,6 +143,8 @@
return "freenet.Message: "+getMessageName()
+" @"+source+" @ "+Long.toHexString(id);
}
+
+ public abstract boolean hasTrailer();
}
Index: OpenConnectionManager.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/OpenConnectionManager.java,v
retrieving revision 1.115
retrieving revision 1.116
diff -u -r1.115 -r1.116
--- OpenConnectionManager.java 4 Oct 2003 23:37:26 -0000 1.115
+++ OpenConnectionManager.java 8 Oct 2003 22:54:18 -0000 1.116
@@ -137,15 +137,25 @@
}
/**
+ * See below, weak disabled.
+ */
+ public boolean needsConnection(Identity i) {
+ return needsConnection(i, false);
+ }
+
+ /**
* Does the connection specified need to open a new connection?
* If this returns false, messages will not be routed to that
* identity.
+ * @param weak if true, return whether we ideally would like
+ * a connection - if false, is more strict i.e. more likely
+ * to return false.
*/
- public boolean needsConnection(Identity i) {
+ public boolean needsConnection(Identity i, boolean weak) {
synchronized(peerHandlers) {
PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
if(ph == null) return false;
- return ph.needsConnection();
+ return ph.needsConnection(weak);
}
}
Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- PeerHandler.java 7 Oct 2003 20:03:39 -0000 1.6
+++ PeerHandler.java 8 Oct 2003 22:54:18 -0000 1.7
@@ -23,6 +23,7 @@
NodeReference ref; // the current reference, can be null, meaning we can only
talk back over open conns
final LinkedList messages;
+ final LinkedList messagesWithTrailers;
final LinkedList connectionHandlers;
@@ -61,7 +62,8 @@
connectionHandlers.remove(ch);
}
if(connectionHandlers.size() == 0 &&
- ref == null && !messages.isEmpty()) {
+ ref == null &&
+ (!messages.isEmpty()) || (!messagesWithTrailers.isEmpty())) {
Core.logger.log(this, "Lost all connections for "+id+
", but "+messages.size()+"
messages left",
Logger.NORMAL);
@@ -88,29 +90,39 @@
* Do we need to open a new connection?
* If this returns false, messages will not be routed to that
* identity (see *Routing).
+ * @param weak if false, return whether we have no connections available
+ * to send messages on. If true, return whether we would like to have a
+ * new connection opened.
*/
- public boolean needsConnection() {
+ public boolean needsConnection(boolean weak) {
if(Core.logger.shouldLog(Logger.DEBUG, this))
Core.logger.log(this, "needsConnection(): "+
this+":
chcount="+connectionHandlers.size()+
- ",
messagescount="+messages.size(),
- Logger.DEBUG);
- if(messages.isEmpty() &&
- (id == null || (!(node.rt.references(id))))) {
- if(Core.logger.shouldLog(Logger.DEBUG, this))
- Core.logger.log(this, "returning false immediately",
- Logger.DEBUG);
- return false;
+ ",
messagescount="+messages.size()+
+ ", messagesWithTrailers="+
+ messagesWithTrailers.size(),
Logger.DEBUG);
+ boolean quitNow = false;
+ if(weak) {
+ quitNow = messages.isEmpty() && messagesWithTrailers.isEmpty()
&&
+ (id == null || (!(node.rt.references(id))));
+ if(quitNow && connectionHandlers.isEmpty()) {
+ if(Core.logger.shouldLog(Logger.DEBUG, this))
+ Core.logger.log(this, "returning false
immediately",
+ Logger.DEBUG);
+ return false;
+ }
}
// FIXME: layering
int closedCount = 0;
int sendingCount = 0;
+ int freeCount = 0;
synchronized(connectionHandlers) {
for(Iterator e = connectionHandlers.listIterator(0);
e.hasNext();) {
ConnectionHandler ch = (ConnectionHandler)(e.next());
if(!ch.isOpen()) {
+ e.remove(); // PeerHandler is about SENDING
messages - it will still be regd on OCM
closedCount++;
continue;
}
@@ -124,39 +136,58 @@
if(Core.logger.shouldLog(Logger.DEBUG, this))
Core.logger.log(this, "Found free conn for
"+this,
Logger.DEBUG);
- return false;
+ if(!weak) return false;
+ freeCount++;
}
}
+ if(weak && quitNow) {
+ // If we have no trailers being sent, and nothing queued,
+ // and not in RT, we don't need a new conn.
+ if(sendingCount == 0) return false;
+ }
if(Core.logger.shouldLog(Logger.DEBUG, this))
- Core.logger.log(this, "needsConnection(): "+
+ Core.logger.log(this, "needsConnection("+weak+"): "+
this+":
"+connectionHandlers.size()+
" closed "+closedCount+",
sending "+
- sendingCount, Logger.DEBUG);
+ sendingCount+", free
"+freeCount, Logger.DEBUG);
+ if(weak) {
+ if(freeCount < 2)
+ return true;
+ else
+ return false;
+ }
+ // If we get here, freeCount == 0
return true;
}
public void unsendMessage(PeerPacketMessage pm) {
- // FIXME: speed up - hashtable?
- synchronized(messages) {
- for(Iterator i = messages.listIterator(0); i.hasNext();) {
- PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
- if(cmp == pm) {
- i.remove();
- return;
- }
- }
- }
+ unsendMessage(pm, null);
}
public void unsendMessage(MessageSendCallback cb) {
+ unsendMessage(null, cb);
+ }
+
+ /**
+ * Remove a message by either PeerPacketMessage == or
+ * by MessageSendCallback ==
+ * Note that they have to be exactly the same object.
+ * Note also that it will remove the first object found then return.
+ */
+ public void unsendMessage(PeerPacketMessage pm, MessageSendCallback cb) {
// FIXME: speed up - hashtable?
synchronized(messages) {
- for(Iterator i = messages.listIterator(0); i.hasNext();) {
- PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
- MessageSendCallback cmpCB = cmp.cb;
- if(cb == cmpCB) {
- i.remove();
- return;
+ for(int type = 0; type < 2; type++) {
+ Iterator i =
+ (type == 0 ? messages : messagesWithTrailers).
+ listIterator(0);
+ for(; i.hasNext();) {
+ PeerPacketMessage cmp =
(PeerPacketMessage)(i.next());
+ if((pm != null && cmp == pm) ||
+ (cb != null && cmp.cb == cb)) {
+ i.remove();
+ return;
+ }
}
}
}
@@ -176,6 +207,7 @@
this.node = n;
this.maxPacketSize = maxPacketSize;
messages = new LinkedList();
+ messagesWithTrailers = new LinkedList();
connectionHandlers = new LinkedList();
}
@@ -184,7 +216,7 @@
* Call the callback provided when it is finished
*/
public void sendMessageAsync(Message r, MessageSendCallback cb,
-
int msgPrio)
+ int msgPrio)
throws SendFailedException {
PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
innerSendMessageAsync(pm);
@@ -254,7 +286,10 @@
return;
}
synchronized(messages) {
- messages.addLast(pm);
+ if(pm.hasTrailer())
+ messagesWithTrailers.addLast(pm);
+ else
+ messages.addLast(pm);
}
if(handlersSendingPackets > 0) {
// One of them will call us
@@ -265,7 +300,8 @@
// When it registers with OCM, it will getPacket()
}
Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
-
messages.size());
+
messages.size()+
+
messagesWithTrailers.size());
}
/**
@@ -301,12 +337,13 @@
* starting a close dialog.
* @throws IOException if the connection is already closed
*/
- public synchronized PeerPacket getPacket(Link link, Presentation p,
+ public PeerPacket getPacket(Link link, Presentation p,
Identity i, Message m,
boolean onlyGivenMsg)
throws IOException {
Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
-
messages.size());
+
messages.size()+
+
messagesWithTrailers.size());
/** THE RULES
*
* Collect as many messages as will fit in one packet.
@@ -320,10 +357,18 @@
packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL));
}
if(!onlyGivenMsg) {
+ boolean msgsEmpty = false;
+ boolean msgsWTEmpty = false;
synchronized(messages) {
- while(!messages.isEmpty()) {
- PeerPacketMessage msg =
-
(PeerPacketMessage)(messages.removeFirst());
+ while((!(msgsEmpty = messages.isEmpty())) ||
+ (!(msgsWTEmpty =
messagesWithTrailers.isEmpty()))) {
+ PeerPacketMessage msg = null;
+ if(!msgsEmpty)
+ msg =
(PeerPacketMessage)(messages.removeFirst());
+ if(msg == null && !msgsWTEmpty)
+ msg = (PeerPacketMessage)
+
(messagesWithTrailers.removeFirst());
+ if(msg == null) break;
msg.resolve(p); // we need the length
if((msg.getLength() > maxPacketSize) ||
msg.hasTrailer()) {
@@ -333,9 +378,13 @@
}
if(msg.getLength() + packetLength >
maxPacketSize &&
(!packetMessages.isEmpty())) {
- messages.addFirst(msg);
+ if(msg.hasTrailer())
+
messagesWithTrailers.addFirst(msg);
+ else
+ messages.addFirst(msg);
break;
}
+ packetMessages.addLast(msg);
packetLength += msg.getLength();
}
}
Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- PeerPacket.java 7 Oct 2003 00:47:30 -0000 1.5
+++ PeerPacket.java 8 Oct 2003 22:54:18 -0000 1.6
@@ -6,7 +6,7 @@
class PeerPacket {
Presentation p;
PeerPacketMessage[] messages;
- byte[] encryptedData; // plaintext. Encrypt at send time.
+ byte[] data; // plaintext. Encrypt at send time.
int sentBytes;
boolean hasTrailer = false;
@@ -29,25 +29,24 @@
hasTrailer = true;
}
}
- encryptedData = new byte[totalLength];
+ data = new byte[totalLength];
int x = 0;
for(int i=0;i<msgs.length;i++) {
byte[] msgBytes = msgs[i].getContent();
- System.arraycopy(msgBytes, 0, encryptedData, x, msgBytes.length);
+ System.arraycopy(msgBytes, 0, data, x, msgBytes.length);
x += msgBytes.length;
}
- l.encryptBytes(encryptedData,0,encryptedData.length);
}
public int getLength() {
- return encryptedData.length;
+ return data.length;
}
/**
* @return the actual packet bytes, after encryption
*/
public byte[] getBytes() {
- return encryptedData;
+ return data;
}
public boolean hasTrailer() {
@@ -116,7 +115,7 @@
}
if(prev != null) {
int prevLen = prev.getLength();
- boolean success = (successfullySentBytes == encryptedData.length);
+ boolean success = (successfullySentBytes == data.length);
if(finished || success) {
if(success)
prev.notifySuccess(tw);
Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- PeerPacketMessage.java 7 Oct 2003 20:03:39 -0000 1.4
+++ PeerPacketMessage.java 8 Oct 2003 22:54:18 -0000 1.5
@@ -44,6 +44,10 @@
* cached message.
*/
public void resolve(Presentation p) {
+ if(Core.logger.shouldLog(Logger.DEBUG, this)) {
+ Core.logger.log(this, "resolve("+p+") for "+this,
+ Logger.DEBUG);
+ }
finished = false;
if(this.p == p)
return;
@@ -79,7 +83,7 @@
}
public boolean hasTrailer() {
- return raw.trailingFieldLength > 0;
+ return msg.hasTrailer();
}
public long trailerLength() {
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.416
retrieving revision 1.417
diff -u -r1.416 -r1.417
--- Version.java 7 Oct 2003 20:03:39 -0000 1.416
+++ Version.java 8 Oct 2003 22:54:18 -0000 1.417
@@ -18,7 +18,7 @@
public static String protocolVersion = "1.46";
/** The build number of the current revision */
- public static final int buildNumber = 6223;
+ public static final int buildNumber = 6224;
// 6028: may 3; ARK retrieval fix
public static final int ignoreBuildsAfter = 6500;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs