Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv16753/src/freenet

Modified Files:
        ConnectionHandler.java Message.java OpenConnectionManager.java 
        PeerHandler.java PeerPacket.java PeerPacketMessage.java 
        Version.java 
Log Message:
6224:
More debugging of PeerHandler and supporting microarchitectural changes.
- Splitfile fetch between 2 test nodes works again, rather than failing with RNFs.
- Fix totalDataSent i.e. fix OCM's report of total data sent on a conn
- Encrypt data at send time. Not in advance in packet.
- Locking changes.
- Add hasTrailer() to Message. Implement it on all Messages.
- Make a distinction between needsConnection as in can we route to it, or do we want a 
new connection if possible.
- Keep separate queues for messages with and without trailers in PeerHandler.
- Messages without trailers take priority.
- needConnection(true) returns false if we have no messages queued and no trailers 
sending and we are not in the RT.
- Remove closed conns from the PeerHandler when we find them. They will still be in 
the OCM.
- SendData: Count size of padding sent the same way we count the non-padding - don't 
add it on and therefore don't finish until after we have sent it.
Add a TODO about the data: protocol
- Cancel outwardSender callback in Pending after we get Accepted. We may have a bug 
causing notifications to be lost.
- Fix possible NPE in FnpLink.encryptBytes.
Logging

Also some fixes and optimizations since 6223 by other people.


Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.168
retrieving revision 1.169
diff -u -r1.168 -r1.169
--- ConnectionHandler.java      7 Oct 2003 20:03:39 -0000       1.168
+++ ConnectionHandler.java      8 Oct 2003 22:54:18 -0000       1.169
@@ -710,10 +710,13 @@
                                                                if(processLen == -1) 
processLen = 0;
                                                                if(logDEBUG) {
                                                                        try {
-                                                                               
logDEBUG("Didn't get message for "+this+" from "+
-                                                                                      
          decryptLen+" bytes: \n"+
-                                                                                      
          new String(_accumulator, 0, decryptLen, 
-                                                                                      
                                 "ISO-8859-1"));
+                                                                               
Core.logger.log(this, "Didn't get message for "+this+" from "+
+                                                                                      
                         decryptLen+" bytes: \n"+
+                                                                                      
                         new String(_accumulator, 0, decryptLen, 
+                                                                                      
                                            "ISO-8859-1"),
+                                                                                      
                         decryptLen > 2048 ?
+                                                                                      
                         Logger.ERROR :
+                                                                                      
                         Logger.DEBUG);
                                                                        } catch 
(UnsupportedEncodingException e) {
                                                                                
Core.logger.log(this, "Unsupported Encoding ISO-8859-1!", 
                                                                                       
                         e, Logger.ERROR);
@@ -1028,7 +1031,8 @@
                //we should check the status if terminate() is called from elsewhere 
                if (!status) {
                        if(logDEBUG)
-                               logDEBUG("jobDone failed");
+                               Core.logger.log(this, "jobDone failed for "+this,
+                                                               new 
Exception("debug"), Logger.DEBUG);
                        //tell everybody they failed 
                        //this is where the PeerHandler will really help
                        // Locking!
@@ -1043,6 +1047,9 @@
                                return;
                        }
                }
+               synchronized(sendLock) {
+                       totalDataSent += size;
+               }
                if (sendingTrailerChunk) {
                        if(logDEBUG)
                                logDEBUG("jobDone sending chunk");
@@ -1120,9 +1127,9 @@
                }
                boolean needTerminate = false; // don't terminate while holding locks!
                if(sendClosed.state() && !sendingCloseMessage) return;
+               if(trailerSendID != -1) return;
                if(logDEBUG)
                        logDEBUG("Trying to send a packet...");
-               if(trailerSendID != -1) return;
                // This is nasty...
                // Only way to START a trailer send is through this function
                // sentPacket -> null
@@ -1260,11 +1267,8 @@
                                throw new IllegalArgumentException();
                        sendingTrailerChunkBytes = length;
                        sendingTrailerChunk = true;
-                       // Encrypt the data
                        Link l = link;
-                       if(l != null)
-                               l.encryptBytes(block, offset, length);
-                       else {
+                       if(l == null) {
                                IOException e = 
                                        new IOException("Connection closed in trailer 
send!");
                                Core.logger.log(this, "Oops: "+e+" ("+this+")", e, 
@@ -1924,6 +1928,12 @@
     public void flushOut() {
     }
        
+       protected Object sendBytesLock = new Object(); // force serialization to keep 
cipher consistent - probably unnecessary... FIXME
+       
+       /**
+        * Send some bytes
+        * Note that the toSend will be encrypted!
+        */
        private void sendBytes(byte[] toSend, int off, int len, int priority) throws 
IOException {
                Core.logger.log(this, "Sending "+len+" bytes on "+this,
                                                new Exception("debug"), Logger.DEBUG);
@@ -1938,8 +1948,11 @@
                if(wsl == null) 
                        throw new IllegalStateException("wsl null in "+
                                                                                       
 ConnectionHandler.this);
-               if(!wsl.send(toSend, off, len, chan, ConnectionHandler.this,priority)) 
{
-                       throw new IOException("Can't write");
+               synchronized(sendBytesLock) {
+                       link.encryptBytes(toSend, off, len);
+                       if(!wsl.send(toSend, off, len, chan, 
ConnectionHandler.this,priority)) {
+                               throw new IOException("Can't write");
+                       }
                }
                Core.logger.log(this, "Started send of "+len+" bytes on "+this,
                                                new Exception("debug"), Logger.DEBUG);
@@ -2096,9 +2109,6 @@
                                }
                                if(logDEBUG) logDEBUG("MessageSend finished sending 
message "+this);
                                done = true;
-                               synchronized(sendLock) {
-                                       decSendQueue(toSend.length);
-                               }
                        } finally {
                                // only if there is a trailing field to write do we not
                                // decrement sendingCount and notify() immediately
@@ -2161,8 +2171,10 @@
                                return false;
                        }
                        if(trailerSendID != -1) {
-                               throw new 
IllegalStateException("sendPacket("+packet+","+prio+
-                                                                                      
         " called but sending a trailer!");
+                               if(logDEBUG) 
+                                       logDEBUG("sendPacket("+packet+","+prio+
+                                                        " called but sending a 
trailer!");
+                               return false;
                        }
                        this.sentPacket = packet;
                }
@@ -2170,6 +2182,8 @@
                return true;
        }
        
+       protected Object innerSendPacketLock = new Object();
+       
        /**
         * Send a packet
         * Do NOT call while synchronized on sentPacketLock - hence the argument
@@ -2177,6 +2191,7 @@
         */
        protected void innerSendPacket(int prio, PeerPacket sentPacket) {
                lastActiveTime = System.currentTimeMillis();
+               // Don't send 2 at once
                byte[] toSend = sentPacket.getBytes();
                try {
                        sendBytes(toSend, 0, toSend.length, prio);
@@ -2542,6 +2557,8 @@
      *  specified by the Presentation object if one can be created.
      */
     public void close() {
+               if(logDEBUG)
+                       logDEBUG("close() called");
                if(sendingCloseMessage || sendClosed.state())
                        return;
                sendingCloseMessage = true;
@@ -2561,6 +2578,8 @@
                        }
                }
                innerSendPacket(wsl.MESSAGE, sentPacket);
+               if(logDEBUG)
+                       logDEBUG("Sent close packet");
     }
        
     /**  Closes the connection utterly and finally. */
@@ -2737,23 +2756,6 @@
                return receiveQueueSize;
        }
     
-    /**
-     * Decreases the sendQueue with the specified amount
-        */
-    private void decSendQueue(long amount){
-               //sendQueueSize -= Math.min(amount,sendQueueSize);
-               // i am ever so slightly worried this syntax is bad with volatiles
-       sendQueueSize = Math.max(sendQueueSize - amount, 0);
-               totalDataSent += amount;
-    }
-    
-       /**
-        * Increases the sendQueue with the specified amount
-        */
-       private void incSendQueue(long amount){
-               sendQueueSize += amount;
-       }
-       
        /**
         * Increases the receiveQueue with the specified amount
         */
@@ -3173,7 +3175,8 @@
 //     }
        
        public String toString() {
-               return super.toString()+" for "+conn+","+link+", sending "+sentPacket;
+               return super.toString()+" for "+conn+","+link+", sending 
"+sentPacket+":"+
+                       trailerSendID;
        }
 }
 

Index: Message.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Message.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- Message.java        15 Aug 2003 00:12:05 -0000      1.6
+++ Message.java        8 Oct 2003 22:54:18 -0000       1.7
@@ -143,6 +143,8 @@
         return "freenet.Message: "+getMessageName()
                 +" @"+source+" @ "+Long.toHexString(id);
     }
+    
+    public abstract boolean hasTrailer();
 }
 
 

Index: OpenConnectionManager.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/OpenConnectionManager.java,v
retrieving revision 1.115
retrieving revision 1.116
diff -u -r1.115 -r1.116
--- OpenConnectionManager.java  4 Oct 2003 23:37:26 -0000       1.115
+++ OpenConnectionManager.java  8 Oct 2003 22:54:18 -0000       1.116
@@ -137,15 +137,25 @@
        }
        
        /**
+        * See below, weak disabled.
+        */
+       public boolean needsConnection(Identity i) {
+               return needsConnection(i, false);
+       }
+       
+       /**
         * Does the connection specified need to open a new connection?
         * If this returns false, messages will not be routed to that 
         * identity.
+        * @param weak if true, return whether we ideally would like
+        * a connection - if false, is more strict i.e. more likely
+        * to return false.
         */
-       public boolean needsConnection(Identity i) {
+       public boolean needsConnection(Identity i, boolean weak) {
                synchronized(peerHandlers) {
                        PeerHandler ph = (PeerHandler)(peerHandlers.get(i));
                        if(ph == null) return false;
-                       return ph.needsConnection();
+                       return ph.needsConnection(weak);
                }
        }
        

Index: PeerHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerHandler.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- PeerHandler.java    7 Oct 2003 20:03:39 -0000       1.6
+++ PeerHandler.java    8 Oct 2003 22:54:18 -0000       1.7
@@ -23,6 +23,7 @@
     NodeReference ref; // the current reference, can be null, meaning we can only 
talk back over open conns
     
     final LinkedList messages;
+       final LinkedList messagesWithTrailers;
     
     final LinkedList connectionHandlers;
        
@@ -61,7 +62,8 @@
                        connectionHandlers.remove(ch);
                }
                if(connectionHandlers.size() == 0 &&
-                  ref == null && !messages.isEmpty()) {
+                  ref == null && 
+                  (!messages.isEmpty()) || (!messagesWithTrailers.isEmpty())) {
                        Core.logger.log(this, "Lost all connections for "+id+
                                                        ", but "+messages.size()+" 
messages left",
                                                        Logger.NORMAL);
@@ -88,29 +90,39 @@
         * Do we need to open a new connection?
         * If this returns false, messages will not be routed to that 
         * identity (see *Routing).
+        * @param weak if false, return whether we have no connections available
+        * to send messages on. If true, return whether we would like to have a
+        * new connection opened.
         */
-    public boolean needsConnection() {
+    public boolean needsConnection(boolean weak) {
                if(Core.logger.shouldLog(Logger.DEBUG, this))
                        Core.logger.log(this, "needsConnection(): "+
                                                        this+": 
chcount="+connectionHandlers.size()+
-                                                       ", 
messagescount="+messages.size(),
-                                                       Logger.DEBUG);
-               if(messages.isEmpty() &&
-                  (id == null || (!(node.rt.references(id))))) {
-                       if(Core.logger.shouldLog(Logger.DEBUG, this))
-                               Core.logger.log(this, "returning false immediately",
-                                                               Logger.DEBUG);
-                       return false;
+                                                       ", 
messagescount="+messages.size()+
+                                                       ", messagesWithTrailers="+
+                                                       messagesWithTrailers.size(), 
Logger.DEBUG);
+               boolean quitNow = false;
+               if(weak) {
+                       quitNow = messages.isEmpty() && messagesWithTrailers.isEmpty() 
&&
+                               (id == null || (!(node.rt.references(id))));
+                       if(quitNow && connectionHandlers.isEmpty()) {
+                               if(Core.logger.shouldLog(Logger.DEBUG, this))
+                                       Core.logger.log(this, "returning false 
immediately",
+                                                                       Logger.DEBUG);
+                               return false;
+                       }
                }
                // FIXME: layering
                
                int closedCount = 0;
                int sendingCount = 0;
+               int freeCount = 0;
                synchronized(connectionHandlers) {
                        for(Iterator e = connectionHandlers.listIterator(0);
                                e.hasNext();) {
                                ConnectionHandler ch = (ConnectionHandler)(e.next());
                                if(!ch.isOpen()) {
+                                       e.remove(); // PeerHandler is about SENDING 
messages - it will still be regd on OCM
                                        closedCount++;
                                        continue;
                                }
@@ -124,39 +136,58 @@
                                if(Core.logger.shouldLog(Logger.DEBUG, this))
                                        Core.logger.log(this, "Found free conn for 
"+this,
                                                                        Logger.DEBUG);
-                               return false;
+                               if(!weak) return false;
+                               freeCount++;
                        }
                }
+               if(weak && quitNow) {
+                       // If we have no trailers being sent, and nothing queued,
+                       // and not in RT, we don't need a new conn.
+                       if(sendingCount == 0) return false;
+               }
                if(Core.logger.shouldLog(Logger.DEBUG, this))
-                       Core.logger.log(this, "needsConnection(): "+
+                       Core.logger.log(this, "needsConnection("+weak+"): "+
                                                        this+": 
"+connectionHandlers.size()+
                                                        " closed "+closedCount+", 
sending "+
-                                                       sendingCount, Logger.DEBUG);
+                                                       sendingCount+", free 
"+freeCount, Logger.DEBUG);
+               if(weak) {
+                       if(freeCount < 2)
+                               return true;
+                       else
+                               return false;
+               }
+               // If we get here, freeCount == 0
                return true;
     }
     
        public void unsendMessage(PeerPacketMessage pm) {
-               // FIXME: speed up - hashtable?
-               synchronized(messages) {
-                       for(Iterator i = messages.listIterator(0); i.hasNext();) {
-                               PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
-                               if(cmp == pm) {
-                                       i.remove();
-                                       return;
-                               }
-                       }
-               }
+               unsendMessage(pm, null);
        }
        
        public void unsendMessage(MessageSendCallback cb) {
+               unsendMessage(null, cb);
+       }
+       
+       /**
+        * Remove a message by either PeerPacketMessage == or
+        * by MessageSendCallback ==
+        * Note that they have to be exactly the same object.
+        * Note also that it will remove the first object found then return.
+        */
+       public void unsendMessage(PeerPacketMessage pm, MessageSendCallback cb) {
                // FIXME: speed up - hashtable?
                synchronized(messages) {
-                       for(Iterator i = messages.listIterator(0); i.hasNext();) {
-                               PeerPacketMessage cmp = (PeerPacketMessage)(i.next());
-                               MessageSendCallback cmpCB = cmp.cb;
-                               if(cb == cmpCB) {
-                                       i.remove();
-                                       return;
+                       for(int type = 0; type < 2; type++) {
+                               Iterator i =
+                                       (type == 0 ? messages : messagesWithTrailers).
+                                       listIterator(0);
+                               for(; i.hasNext();) {
+                                       PeerPacketMessage cmp = 
(PeerPacketMessage)(i.next());
+                                       if((pm != null && cmp == pm) || 
+                                          (cb != null && cmp.cb == cb)) {
+                                               i.remove();
+                                               return;
+                                       }
                                }
                        }
                }
@@ -176,6 +207,7 @@
                this.node = n;
                this.maxPacketSize = maxPacketSize;
                messages = new LinkedList();
+               messagesWithTrailers = new LinkedList();
                connectionHandlers = new LinkedList();
     }
     
@@ -184,7 +216,7 @@
      * Call the callback provided when it is finished
      */
     public void sendMessageAsync(Message r, MessageSendCallback cb,
-                                                                                      
   int msgPrio)
+                                                                int msgPrio)
                throws SendFailedException {
                PeerPacketMessage pm = new PeerPacketMessage(id, r, cb, msgPrio);
                innerSendMessageAsync(pm);
@@ -254,7 +286,10 @@
                        return;
                }
                synchronized(messages) {
-                       messages.addLast(pm);
+                       if(pm.hasTrailer())
+                               messagesWithTrailers.addLast(pm);
+                       else
+                               messages.addLast(pm);
                }
                if(handlersSendingPackets > 0) {
                        // One of them will call us
@@ -265,7 +300,8 @@
                        // When it registers with OCM, it will getPacket()
                }
                Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
-                                                                                      
   messages.size());
+                                                                                      
   messages.size()+
+                                                                                      
   messagesWithTrailers.size());
        }
     
     /**
@@ -301,12 +337,13 @@
         * starting a close dialog.
         * @throws IOException if the connection is already closed
      */
-    public synchronized PeerPacket getPacket(Link link, Presentation p,
+    public PeerPacket getPacket(Link link, Presentation p,
                                                                                       
  Identity i, Message m,
                                                                                       
  boolean onlyGivenMsg) 
                throws IOException {
                Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
-                                                                                      
   messages.size());
+                                                                                      
   messages.size()+
+                                                                                      
   messagesWithTrailers.size());
                /** THE RULES
                 *
                 * Collect as many messages as will fit in one packet.
@@ -320,10 +357,18 @@
                        packetMessages.add(new PeerPacketMessage(i, m, null, NORMAL));
                }
                if(!onlyGivenMsg) {
+                       boolean msgsEmpty = false;
+                       boolean msgsWTEmpty = false;
                        synchronized(messages) {
-                               while(!messages.isEmpty()) {
-                                       PeerPacketMessage msg = 
-                                               
(PeerPacketMessage)(messages.removeFirst());
+                               while((!(msgsEmpty = messages.isEmpty())) ||
+                                         (!(msgsWTEmpty = 
messagesWithTrailers.isEmpty()))) {
+                                       PeerPacketMessage msg = null;
+                                       if(!msgsEmpty)
+                                               msg = 
(PeerPacketMessage)(messages.removeFirst());
+                                       if(msg == null && !msgsWTEmpty)
+                                               msg = (PeerPacketMessage)
+                                                       
(messagesWithTrailers.removeFirst());
+                                       if(msg == null) break;
                                        msg.resolve(p); // we need the length
                                        if((msg.getLength() > maxPacketSize) || 
                                           msg.hasTrailer()) {
@@ -333,9 +378,13 @@
                                        }
                                        if(msg.getLength() + packetLength >
                                           maxPacketSize && 
(!packetMessages.isEmpty())) {
-                                               messages.addFirst(msg);
+                                               if(msg.hasTrailer())
+                                                       
messagesWithTrailers.addFirst(msg);
+                                               else
+                                                       messages.addFirst(msg);
                                                break;
                                        }
+                                       packetMessages.addLast(msg);
                                        packetLength += msg.getLength();
                                }
                        }

Index: PeerPacket.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacket.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- PeerPacket.java     7 Oct 2003 00:47:30 -0000       1.5
+++ PeerPacket.java     8 Oct 2003 22:54:18 -0000       1.6
@@ -6,7 +6,7 @@
 class PeerPacket {
     Presentation p;
     PeerPacketMessage[] messages;
-    byte[] encryptedData; // plaintext. Encrypt at send time.
+    byte[] data; // plaintext. Encrypt at send time.
     int sentBytes;
     boolean hasTrailer = false;
     
@@ -29,25 +29,24 @@
                hasTrailer = true;
            }
        }
-       encryptedData = new byte[totalLength];
+       data = new byte[totalLength];
        int x = 0;
        for(int i=0;i<msgs.length;i++) {
            byte[] msgBytes = msgs[i].getContent();
-           System.arraycopy(msgBytes, 0, encryptedData, x, msgBytes.length);
+           System.arraycopy(msgBytes, 0, data, x, msgBytes.length);
            x += msgBytes.length;
        }
-       l.encryptBytes(encryptedData,0,encryptedData.length);
     }
     
     public int getLength() {
-       return encryptedData.length;
+       return data.length;
     }
     
     /**
      * @return the actual packet bytes, after encryption
      */
     public byte[] getBytes() {
-       return encryptedData;
+       return data;
     }
     
     public boolean hasTrailer() {
@@ -116,7 +115,7 @@
        }
        if(prev != null) {
            int prevLen = prev.getLength();
-           boolean success = (successfullySentBytes == encryptedData.length);
+           boolean success = (successfullySentBytes == data.length);
            if(finished || success) {
                if(success)
                    prev.notifySuccess(tw);

Index: PeerPacketMessage.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/PeerPacketMessage.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- PeerPacketMessage.java      7 Oct 2003 20:03:39 -0000       1.4
+++ PeerPacketMessage.java      8 Oct 2003 22:54:18 -0000       1.5
@@ -44,6 +44,10 @@
      * cached message.
      */
     public void resolve(Presentation p) {
+       if(Core.logger.shouldLog(Logger.DEBUG, this)) {
+           Core.logger.log(this, "resolve("+p+") for "+this,
+                           Logger.DEBUG);
+       }
        finished = false;
        if(this.p == p)
            return;
@@ -79,7 +83,7 @@
     }
     
     public boolean hasTrailer() {
-       return raw.trailingFieldLength > 0;
+       return msg.hasTrailer();
     }
     
     public long trailerLength() {

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.416
retrieving revision 1.417
diff -u -r1.416 -r1.417
--- Version.java        7 Oct 2003 20:03:39 -0000       1.416
+++ Version.java        8 Oct 2003 22:54:18 -0000       1.417
@@ -18,7 +18,7 @@
     public static String protocolVersion = "1.46";
     
     /** The build number of the current revision */
-    public static final int buildNumber = 6223;
+    public static final int buildNumber = 6224;
     // 6028: may 3; ARK retrieval fix
 
     public static final int ignoreBuildsAfter = 6500;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to