Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv29253/src/freenet

Modified Files:
        PeerHandler.java PeerPacket.java PeerPacketMessage.java 
        Version.java 
Log Message:
6234:
* Limit the volume of queryrejects sent to a node due to version being too old. 
Includes exponential backoff.
* Dump low priority messages (e.g. most QueryRejected's) if we don't have a free conn 
(reinstates pre-PH behaviour - bugfix).
* If a node is not contactable, and no free connections, discard the message.
* Bugfixes in PacketMessage.jobDone relating to losing message sent notifications.
* New diagnostics: messageSendTimeContactable, messageSendTimeNonContactable. Not all 
old ones restored yet.
* Make sure we close files in FileEventList
* Add count of total files open by NativeFSDir to interesting objects dump.
* Indenting, logging


Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -r1.13 -r1.14
--- PeerHandler.java    10 Oct 2003 23:26:59 -0000      1.13
+++ PeerHandler.java    11 Oct 2003 20:00:13 -0000      1.14
@@ -29,6 +29,11 @@
        
     final Node node;
        
+       long lastMessageSentTime = -1;
+       Object rejectOldVersionLock = new Object();
+       final long initialRejectOldVersionTime = 250;
+       long rejectOldVersionTime = initialRejectOldVersionTime;
+       
        boolean logDEBUG;
     
     int maxPacketSize;
@@ -240,7 +245,22 @@
                connectionHandlers = new LinkedList();
                logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);
     }
-    
+       
+       public long timeSinceLastMessageSent() {
+               return System.currentTimeMillis() - lastMessageSentTime;
+       }
+       
+       public long rejectOldVersion(boolean reset) {
+               synchronized(rejectOldVersionLock) {
+                       if(reset) {
+                               rejectOldVersionTime = initialRejectOldVersionTime;
+                       } else {
+                               rejectOldVersionTime <<= 1;
+                       }
+                       return rejectOldVersionTime;
+               }
+       }
+       
     /**
      * Start an asynchronous message send
      * Call the callback provided when it is finished
@@ -248,7 +268,7 @@
     public void sendMessageAsync(Message r, MessageSendCallback cb,
                                                                 int msgPrio)
                throws SendFailedException {
-               PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+               PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio, this);
                innerSendMessageAsync(pm);
        }
        
@@ -265,6 +285,7 @@
                logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);  
                if(logDEBUG) Core.logger.log(this, "Sending "+pm+" on "+this,
                                                Logger.DEBUG);
+               lastMessageSentTime = System.currentTimeMillis();
                int handlersSendingPackets = 0;
                synchronized(connectionHandlers) {
                        for(Iterator e = connectionHandlers.listIterator(0);
@@ -316,6 +337,12 @@
                                Core.logger.log(this, "Failed to send packet, no more 
conns, no way to open connection: DISCARDING "+pm+" on "+this, Logger.NORMAL);
                        return;
                }
+               if(pm.priority == EXPENDABLE) {
+                       Core.logger.log(this, "Discarding low priority message "+pm+" 
on "+
+                                                       this, Logger.MINOR);
+                       pm.notifyFailure(null);
+                       return;
+               }
                synchronized(messages) {
                        if(pm.hasTrailer())
                                messagesWithTrailers.addLast(pm);
@@ -390,7 +417,7 @@
                        Core.logger.log(this, "getPacket("+link+","+p+","+i+","+m+","+
                                                        onlyGivenMsg+" on "+this, 
Logger.DEBUG);
                if(m != null) {
-                       packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL));
+                       packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL, 
this));
                }
                if(!onlyGivenMsg) {
                        boolean msgsEmpty = false;
@@ -410,8 +437,9 @@
                                           msg.hasTrailer()) {
                                                if(logDEBUG)
                                                        Core.logger.log(this, "Adding 
"+msg+
-                                                                                      
 " to packet for "+
-                                                                                      
 this, Logger.DEBUG);
+                                                                                      
 " to packet ("+
+                                                                                      
 packetMessages.size()+
+                                                                                      
 ") for "+this, Logger.DEBUG);
                                                packetMessages.addLast(msg);
                                                packetLength += msg.getLength();
                                                break;
@@ -461,7 +489,7 @@
                logDEBUG = Core.logger.shouldLog(Logger.DEBUG,this);  
                MyMessageSendCallback cb =
                        new MyMessageSendCallback();
-               PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
+               PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio, this);
                innerSendMessageAsync(pm);
                Core.logger.log(this, "Sent "+pm+" async", Logger.DEBUG);
                long startAt = System.currentTimeMillis();
@@ -475,7 +503,8 @@
                                if(maxTimeoutAt != -1 &&
                                   System.currentTimeMillis() > maxTimeoutAt) {
                                        Core.logger.log(this, "Took more than 5 
minutes to send "+
-                                                                       pm+"!!", 
Logger.ERROR);
+                                                                       pm+"!!", ref 
== null ? Logger.MINOR :
+                                                                       Logger.ERROR);
                                        break;
                                }
                                try {

Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- PeerPacket.java     10 Oct 2003 23:26:59 -0000      1.8
+++ PeerPacket.java     11 Oct 2003 20:00:14 -0000      1.9
@@ -85,8 +85,10 @@
      * Can be zero. Bytes after this number might have been sent, but we got
      * an error and can't be sure.
      */
-    public void jobDone(boolean finished, int successfullySentBytes, Peer sentTo, 
TrailerWriter tw) {
+    public void jobDone(boolean finished, int successfullySentBytes, 
+                       Peer sentTo, TrailerWriter tw) {
        int msgStartOffset = 0;
+       int prevMsgStartOffset = 0;
        boolean logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
        if(logDEBUG)
            Core.logger.log(this, this.toString()+".jobDone("+finished+","+
@@ -94,14 +96,16 @@
                            Logger.DEBUG);
        // Could unfold this a bit by keeping the offset in the array on the object
        PeerPacketMessage prev = null;
-       for(int i=0;i<messages.length;i++) {
+       for(int i=0;i<(messages.length+1);i++) {
            if(logDEBUG)
-               Core.logger.log(this, "Message "+i+": "+messages[i]+", prev="+
+               Core.logger.log(this, "Message "+i+": "+"prev="+
                                prev+", msgStartOffset="+msgStartOffset+
+                               ", prevMsgStartOffset="+prevMsgStartOffset+
                                ", finished="+finished+" for "+this,
                                Logger.DEBUG);
            if((!finished) && 
               msgStartOffset > successfullySentBytes) {
+               // msgStartOffset - if the prev message ends after where we have sent 
up to, we can't do anything with it
                if(logDEBUG)
                    Core.logger.log(this, "Finished loop in jobDone on "+this,
                                    Logger.DEBUG);
@@ -109,11 +113,15 @@
                break;
            }
            if(msgStartOffset < sentBytes) {
-               msgStartOffset += messages[i].getLength();
-               prev = messages[i];
+               // If the prev message ends before the bytes we sent last time end, we 
have already processed it
+               prev = i >= messages.length ? null : messages[i];
+               prevMsgStartOffset = msgStartOffset;
+               msgStartOffset += prev == null ? 0 :
+                   messages[i].getLength();
                if(logDEBUG)
-                   Core.logger.log(this, "Skipping "+i+" in jobDone on "+this,
-                                   Logger.DEBUG);
+                   Core.logger.log(this, "Skipping "+i+" in jobDone on "+this+
+                                   ", successfullySentBytes="+
+                                   successfullySentBytes, Logger.DEBUG);
                continue;
            }
            // msgStartOffset >= sentBytes
@@ -121,16 +129,23 @@
                // Last time sent a whole message and no more
                // Last message is dealt with
                if(logDEBUG)
-                   Core.logger.log(this, "Prev message dealt with in jobDone on 
"+this, Logger.DEBUG);
+                   Core.logger.log(this, "Prev message "+prev+" dealt with in jobDone 
on "+this, Logger.DEBUG);
            } else {
                if(logDEBUG)
-                   Core.logger.log(this, "Didn't finish last message for "+this,
-                                   Logger.DEBUG);
+                   Core.logger.log(this, "Didn't finish last message for "+this+
+                                   ": "+prev, Logger.DEBUG);
                // msgStartOffset > sentBytes
                // Last message was not dealt with last time
                // prev != null because msgStartOffset > 0
                int prevLen = prev.getLength();
-               int diff = successfullySentBytes - msgStartOffset;
+               int diff = successfullySentBytes - prevMsgStartOffset;
+               if(logDEBUG)
+                   Core.logger.log(this, "prevLen = "+prevLen+
+                                   ", successfullySentBytes="+
+                                   successfullySentBytes+", msgStartOffset="+
+                                   msgStartOffset+", prevMsgStartOffset="+
+                                   prevMsgStartOffset+", diff="+diff+
+                                   ", finished="+finished, Logger.DEBUG);
                if(finished || diff >= prevLen) {
                    if(diff >= prevLen) {
                        prev.notifySuccess(tw);
@@ -147,35 +162,14 @@
                    }
                }
            }
-           msgStartOffset += messages[i].getLength();
-           prev = messages[i];
-       }
-       if(prev != null) {
-           if(logDEBUG)
-               Core.logger.log(this, "Finishing "+prev+" for jobDone for "+this,
-                               Logger.DEBUG);
-           int prevLen = prev.getLength();
-           boolean success = (successfullySentBytes == data.length);
-           if(finished || success) {
-               if(success)
-                   prev.notifySuccess(tw);
-               else {
-                   int diff = successfullySentBytes - msgStartOffset;
-                   String excuse = "Sent "+
-                       (diff>=0 ? diff : 0) +
-                           " bytes ("+(getLength() - successfullySentBytes)+
-                           " of packet in notifyDone";
-                       SendFailedException sfe = 
-                           new SendFailedException(sentTo.getAddress(), 
-                                                   sentTo.getIdentity(),
-                                                   excuse, false);
-                       prev.notifyFailure(sfe);
-               }
-           }
+           prev = i >= messages.length ? null : messages[i];
+           prevMsgStartOffset = msgStartOffset;
+           msgStartOffset += prev == null ? 0 :
+               messages[i].getLength();
        }
+       sentBytes = successfullySentBytes;
        if(logDEBUG)
            Core.logger.log(this, toString()+".jobDone finished",
                            Logger.DEBUG);
-       sentBytes = successfullySentBytes;
     }
 }

Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- PeerPacketMessage.java      10 Oct 2003 23:26:59 -0000      1.7
+++ PeerPacketMessage.java      11 Oct 2003 20:00:14 -0000      1.8
@@ -12,6 +12,7 @@
 class PeerPacketMessage {
     final Message msg;
     final MessageSendCallback cb;
+    final PeerHandler ph;
     final Identity id;
     static ByteArrayOutputStream bais = new ByteArrayOutputStream();
     boolean finished = false;
@@ -31,12 +32,13 @@
     }
     
     public PeerPacketMessage(Identity i, Message msg, MessageSendCallback cb,
-                            int priority) {
+                            int priority, PeerHandler ph) {
+       startTime = System.currentTimeMillis();
        this.id = i;
        this.msg = msg;
        this.cb = cb;
        this.priority = priority;
-       startTime = System.currentTimeMillis();
+       this.ph = ph;
     }
     
     /** Set the message up to send on a connection using a specific
@@ -109,9 +111,14 @@
            return;
        }
        finished = true;
-       Core.diagnostics.occurrenceContinuous("messageSendTime",
-                                             System.currentTimeMillis() -
-                                             startTime);
+       long sendTime = System.currentTimeMillis() - startTime;
+       Core.diagnostics.occurrenceContinuous("messageSendTime", sendTime);
+       if(ph.ref != null)
+           Core.diagnostics.occurrenceContinuous("messageSendTimeContactable",
+                                                 sendTime);
+       else
+           Core.diagnostics.occurrenceContinuous("messageSendTimeNonContactable",
+                                                 sendTime);
        if(cb == null) return;
        try {
            if(tw != null) cb.setTrailerWriter(tw);

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.426
retrieving revision 1.427
diff -u -r1.426 -r1.427
--- Version.java        11 Oct 2003 01:03:28 -0000      1.426
+++ Version.java        11 Oct 2003 20:00:15 -0000      1.427
@@ -18,7 +18,7 @@
     public static String protocolVersion = "1.46";
     
     /** The build number of the current revision */
-    public static final int buildNumber = 6233;
+    public static final int buildNumber = 6234;
     // 6028: may 3; ARK retrieval fix
 
     public static final int ignoreBuildsAfter = 6500;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to