Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv29253/src/freenet
Modified Files:
PeerHandler.java PeerPacket.java PeerPacketMessage.java
Version.java
Log Message:
6234:
* Limit the volume of queryrejects sent to a node due to version being too old.
Includes exponential backoff.
* Dump low priority messages (e.g. most QueryRejected's) if we don't have a free conn
(reinstates pre-PH behaviour - bugfix).
* If a node is not contactable, and no free connections, discard the message.
* Bugfixes in PacketMessage.jobDone relating to losing message sent notifications.
* New diagnostics: messageSendTimeContactable, messageSendTimeNonContactable. Not all
old ones restored yet.
* Make sure we close files in FileEventList
* Add count of total files open by NativeFSDir to interesting objects dump.
* Indenting, logging
Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- PeerHandler.java 10 Oct 2003 23:26:59 -0000 1.13
+++ PeerHandler.java 11 Oct 2003 20:00:13 -0000 1.14
@@ -29,6 +29,11 @@
final Node node;
+ long lastMessageSentTime = -1;
+ Object rejectOldVersionLock = new Object();
+ final long initialRejectOldVersionTime = 250;
+ long rejectOldVersionTime = initialRejectOldVersionTime;
+
boolean logDEBUG;
int maxPacketSize;
@@ -240,7 +245,22 @@
connectionHandlers = new LinkedList();
logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
}
-
+
+ public long timeSinceLastMessageSent() {
+ return System.currentTimeMillis() - lastMessageSentTime;
+ }
+
+ public long rejectOldVersion(boolean reset) {
+ synchronized(rejectOldVersionLock) {
+ if(reset) {
+ rejectOldVersionTime = initialRejectOldVersionTime;
+ } else {
+ rejectOldVersionTime <<= 1;
+ }
+ return rejectOldVersionTime;
+ }
+ }
+
/**
* Start an asynchronous message send
* Call the callback provided when it is finished
@@ -248,7 +268,7 @@
public void sendMessageAsync(Message r, MessageSendCallback cb,
int msgPrio)
throws SendFailedException {
- PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+ PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio, this);
innerSendMessageAsync(pm);
}
@@ -265,6 +285,7 @@
logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
if(logDEBUG) Core.logger.log(this, "Sending "+pm+" on "+this,
Logger.DEBUG);
+ lastMessageSentTime = System.currentTimeMillis();
int handlersSendingPackets = 0;
synchronized(connectionHandlers) {
for(Iterator e = connectionHandlers.listIterator(0);
@@ -316,6 +337,12 @@
Core.logger.log(this, "Failed to send packet, no more
conns, no way to open connection: DISCARDING "+pm+" on "+this, Logger.NORMAL);
return;
}
+ if(pm.priority == EXPENDABLE) {
+ Core.logger.log(this, "Discarding low priority message "+pm+"
on "+
+ this, Logger.MINOR);
+ pm.notifyFailure(null);
+ return;
+ }
synchronized(messages) {
if(pm.hasTrailer())
messagesWithTrailers.addLast(pm);
@@ -390,7 +417,7 @@
Core.logger.log(this, "getPacket("+link+","+p+","+i+","+m+","+
onlyGivenMsg+" on "+this,
Logger.DEBUG);
if(m != null) {
- packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL));
+ packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL,
this));
}
if(!onlyGivenMsg) {
boolean msgsEmpty = false;
@@ -410,8 +437,9 @@
msg.hasTrailer()) {
if(logDEBUG)
Core.logger.log(this, "Adding
"+msg+
-
" to packet for "+
-
this, Logger.DEBUG);
+
" to packet ("+
+
packetMessages.size()+
+
") for "+this, Logger.DEBUG);
packetMessages.addLast(msg);
packetLength += msg.getLength();
break;
@@ -461,7 +489,7 @@
logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
MyMessageSendCallback cb =
new MyMessageSendCallback();
- PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+ PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio, this);
innerSendMessageAsync(pm);
Core.logger.log(this, "Sent "+pm+" async", Logger.DEBUG);
long startAt = System.currentTimeMillis();
@@ -475,7 +503,8 @@
if(maxTimeoutAt != -1 &&
System.currentTimeMillis() > maxTimeoutAt) {
Core.logger.log(this, "Took more than 5
minutes to send "+
- pm+"!!",
Logger.ERROR);
+ pm+"!!", ref
== null ? Logger.MINOR :
+ Logger.ERROR);
break;
}
try {
Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- PeerPacket.java 10 Oct 2003 23:26:59 -0000 1.8
+++ PeerPacket.java 11 Oct 2003 20:00:14 -0000 1.9
@@ -85,8 +85,10 @@
* Can be zero. Bytes after this number might have been sent, but we got
* an error and can't be sure.
*/
- public void jobDone(boolean finished, int successfullySentBytes, Peer sentTo,
TrailerWriter tw) {
+ public void jobDone(boolean finished, int successfullySentBytes,
+ Peer sentTo, TrailerWriter tw) {
int msgStartOffset = 0;
+ int prevMsgStartOffset = 0;
boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
if(logDEBUG)
Core.logger.log(this, this.toString()+".jobDone("+finished+","+
@@ -94,14 +96,16 @@
Logger.DEBUG);
// Could unfold this a bit by keeping the offset in the array on the object
PeerPacketMessage prev = null;
- for(int i=0;i<messages.length;i++) {
+ for(int i=0;i<(messages.length+1);i++) {
if(logDEBUG)
- Core.logger.log(this, "Message "+i+": "+messages[i]+", prev="+
+ Core.logger.log(this, "Message "+i+": "+"prev="+
prev+", msgStartOffset="+msgStartOffset+
+ ", prevMsgStartOffset="+prevMsgStartOffset+
", finished="+finished+" for "+this,
Logger.DEBUG);
if((!finished) &&
msgStartOffset > successfullySentBytes) {
+ // msgStartOffset - if the prev message ends after where we have sent
up to, we can't do anything with it
if(logDEBUG)
Core.logger.log(this, "Finished loop in jobDone on "+this,
Logger.DEBUG);
@@ -109,11 +113,15 @@
break;
}
if(msgStartOffset < sentBytes) {
- msgStartOffset += messages[i].getLength();
- prev = messages[i];
+ // If the prev message ends before the bytes we sent last time end, we
have already processed it
+ prev = i >= messages.length ? null : messages[i];
+ prevMsgStartOffset = msgStartOffset;
+ msgStartOffset += prev == null ? 0 :
+ messages[i].getLength();
if(logDEBUG)
- Core.logger.log(this, "Skipping "+i+" in jobDone on "+this,
- Logger.DEBUG);
+ Core.logger.log(this, "Skipping "+i+" in jobDone on "+this+
+ ", successfullySentBytes="+
+ successfullySentBytes, Logger.DEBUG);
continue;
}
// msgStartOffset >= sentBytes
@@ -121,16 +129,23 @@
// Last time sent a whole message and no more
// Last message is dealt with
if(logDEBUG)
- Core.logger.log(this, "Prev message dealt with in jobDone on
"+this, Logger.DEBUG);
+ Core.logger.log(this, "Prev message "+prev+" dealt with in jobDone
on "+this, Logger.DEBUG);
} else {
if(logDEBUG)
- Core.logger.log(this, "Didn't finish last message for "+this,
- Logger.DEBUG);
+ Core.logger.log(this, "Didn't finish last message for "+this+
+ ": "+prev, Logger.DEBUG);
// msgStartOffset > sentBytes
// Last message was not dealt with last time
// prev != null because msgStartOffset > 0
int prevLen = prev.getLength();
- int diff = successfullySentBytes - msgStartOffset;
+ int diff = successfullySentBytes - prevMsgStartOffset;
+ if(logDEBUG)
+ Core.logger.log(this, "prevLen = "+prevLen+
+ ", successfullySentBytes="+
+ successfullySentBytes+", msgStartOffset="+
+ msgStartOffset+", prevMsgStartOffset="+
+ prevMsgStartOffset+", diff="+diff+
+ ", finished="+finished, Logger.DEBUG);
if(finished || diff >= prevLen) {
if(diff >= prevLen) {
prev.notifySuccess(tw);
@@ -147,35 +162,14 @@
}
}
}
- msgStartOffset += messages[i].getLength();
- prev = messages[i];
- }
- if(prev != null) {
- if(logDEBUG)
- Core.logger.log(this, "Finishing "+prev+" for jobDone for "+this,
- Logger.DEBUG);
- int prevLen = prev.getLength();
- boolean success = (successfullySentBytes == data.length);
- if(finished || success) {
- if(success)
- prev.notifySuccess(tw);
- else {
- int diff = successfullySentBytes - msgStartOffset;
- String excuse = "Sent "+
- (diff>=0 ? diff : 0) +
- " bytes ("+(getLength() - successfullySentBytes)+
- " of packet in notifyDone";
- SendFailedException sfe =
- new SendFailedException(sentTo.getAddress(),
- sentTo.getIdentity(),
- excuse, false);
- prev.notifyFailure(sfe);
- }
- }
+ prev = i >= messages.length ? null : messages[i];
+ prevMsgStartOffset = msgStartOffset;
+ msgStartOffset += prev == null ? 0 :
+ messages[i].getLength();
}
+ sentBytes = successfullySentBytes;
if(logDEBUG)
Core.logger.log(this, toString()+".jobDone finished",
Logger.DEBUG);
- sentBytes = successfullySentBytes;
}
}
Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- PeerPacketMessage.java 10 Oct 2003 23:26:59 -0000 1.7
+++ PeerPacketMessage.java 11 Oct 2003 20:00:14 -0000 1.8
@@ -12,6 +12,7 @@
class PeerPacketMessage {
final Message msg;
final MessageSendCallback cb;
+ final PeerHandler ph;
final Identity id;
static ByteArrayOutputStream bais = new ByteArrayOutputStream();
boolean finished = false;
@@ -31,12 +32,13 @@
}
public PeerPacketMessage(Identity i, Message msg, MessageSendCallback cb,
- int priority) {
+ int priority, PeerHandler ph) {
+ startTime = System.currentTimeMillis();
this.id = i;
this.msg = msg;
this.cb = cb;
this.priority = priority;
- startTime = System.currentTimeMillis();
+ this.ph = ph;
}
/** Set the message up to send on a connection using a specific
@@ -109,9 +111,14 @@
return;
}
finished = true;
- Core.diagnostics.occurrenceContinuous("messageSendTime",
- System.currentTimeMillis() -
- startTime);
+ long sendTime = System.currentTimeMillis() - startTime;
+ Core.diagnostics.occurrenceContinuous("messageSendTime", sendTime);
+ if(ph.ref != null)
+ Core.diagnostics.occurrenceContinuous("messageSendTimeContactable",
+ sendTime);
+ else
+ Core.diagnostics.occurrenceContinuous("messageSendTimeNonContactable",
+ sendTime);
if(cb == null) return;
try {
if(tw != null) cb.setTrailerWriter(tw);
Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.426
retrieving revision 1.427
diff -u -r1.426 -r1.427
--- Version.java 11 Oct 2003 01:03:28 -0000 1.426
+++ Version.java 11 Oct 2003 20:00:15 -0000 1.427
@@ -18,7 +18,7 @@
public static String protocolVersion = "1.46";
/** The build number of the current revision */
- public static final int buildNumber = 6233;
+ public static final int buildNumber = 6234;
// 6028: may 3; ARK retrieval fix
public static final int ignoreBuildsAfter = 6500;
_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs