Update of /cvsroot/freenet/freenet/src/freenet
In directory sc8-pr-cvs1:/tmp/cvs-serv13542/src/freenet

Modified Files:
        ConnectionHandler.java MessageSendCallback.java Version.java 
Added Files:
        PeerHandler.java PeerPacket.java PeerPacketMessage.java 
Log Message:
6179:
Fix zab's deadlocks.
Add setTrailerStream to MessageSendCallback. Now we can theoretically send any message 
fully async (we don't yet use it except in the below).
Add some new files, PeerHandler, PeerPacket, PeerPacketMessage, which will be used for 
a major refactoring of ConnectionHandler and separation of the message queue into a 
per-peer thing, rather than a per-connection thing. Not integrated yet, so should be 
harmless.
toString(), logging. Add log messages for long messageInitialStateTime's.
Add getCipher() to CipherOutputStream, for future use in refactoring and possibly 
elsewhere, and Link.encryptBytes(byte[]) for similar use.


--- NEW FILE: PeerHandler.java ---
package freenet;
import java.util.*;
import java.io.OutputStream;
import freenet.Core;
import freenet.node.Node;
import freenet.session.Link;
import freenet.support.Logger;

class PeerHandler {
    Peer peer;
    
    LinkedList messages;
    
    LinkedList connectionHandlers;
    
    Node node;
    
    int maxPacketSize;

    public PeerHandler(Peer p, Node n, int maxPacketSize) {
        this.peer = p;
        this.node = n;
        this.maxPacketSize = maxPacketSize;
    }
    
    /**
     * Start an asynchronous message send
     * Call the callback provided when it is finished
     */
    synchronized void sendMessageAsync(RawMessage r, MessageSendCallback cb) 
        throws SendFailedException {
        PeerPacketMessage pm = new PeerPacketMessage(peer, r, cb);
        throw new UnsupportedOperationException("PeerHandler.sendMessageAsync");
        /* Possibilities:
         *
         * 1. No ConnectionHandlers (without trailing fields) open.
         * Strategy: open one, then send message
         * 2. Idle ConnectionHandlers
         * Strategy: pick one, make it send a single message packet
         * 3. No Idle ConnectionHandlers
         * Strategy: wait for one to ask us for a packet
         */
//      int handlersSendingPackets = 0;
//      for(Iterator e = connectionHandlers.listIterator(0);
//          e.hasNext();) {
//          ConnectionHandler ch =
//              (ConnectionHandler)(e.next());
//          if(!ch.isOpen()) {
//              // Can't send messages
//              Core.logger.log(this, ch.toString()+" can't send messages",
//                              Logger.DEBUG);
//              e.remove();
//              continue;
//          }
// //       if(ch.isSendingPacket()) {
// //           handlersSendingPackets++;
// //       }
//          // Not busy!
//          sendSinglePacket(ch, pm);
//          return;
//          // No suitable ConnectionHandlers found!
//          // Queue the message
//          messages.addLast(pm);
//          if(handlersSendingPackets > 0) {
//              // One of them will call us
//          } else {
//              // Uh oh...
//              node.scheduleConnectionOpener(peer.getIdentity());
//              // When it registers with OCM, it will getPacket()
//          }
//      }
    }
    
    /**
     * Send one packet on one ConnectionHandler
     */
    protected void sendSinglePacket(ConnectionHandler ch, 
                                    PeerPacketMessage pm) {
        PeerPacketMessage[] msgs = new PeerPacketMessage[] { pm };
        PeerPacket packet = new PeerPacket(msgs, ch.getLink());
        //ch.sendPacket(packet);
        throw new UnsupportedOperationException("sendSinglePacket");
    }
    
    /**
     * Get a packet to send. Called by a ConnectionHandler.
     * It will then send it. synchronized so as to avoid isSendingPacket()
     * race with sendMessageAsync. Returns null if nothing to send.
     */
    public synchronized PeerPacket getPacket(Link link) {
        /** THE RULES
         *
         * Collect as many messages as will fit in one packet.
         * If a message is bigger than the maximum, add it on to the packet 
         * and send the oversized packet anyway.
         * Stop if a message includes a trailing field.
         */
        int packetLength = 0;
        LinkedList packetMessages = new LinkedList();
        while(!messages.isEmpty()) {
            PeerPacketMessage msg = 
                (PeerPacketMessage)(messages.removeFirst());
            if((msg.getLength() > maxPacketSize) || 
               msg.hasTrailer()) {
                packetMessages.addLast(msg);
                packetLength += msg.getLength();
                break;
            }
            if(msg.getLength() + packetLength >
               maxPacketSize && (!packetMessages.isEmpty())) {
                messages.addFirst(msg);
                break;
            }
            packetLength += msg.getLength();
        }
        if(packetMessages.isEmpty()) return null;
        else {
            PeerPacketMessage[] msgs = 
                new PeerPacketMessage[packetMessages.size()];
            packetMessages.toArray(msgs);
            return new PeerPacket(msgs, link);
        }
    }
    
    /**
     * Send a message synchronously
     * @return a stream to write the trailing message to, or null
     * if the message does not have a trailing field.
     */
    OutputStream sendMessage(RawMessage r) throws SendFailedException {
        MyMessageSendCallback cb =
            new MyMessageSendCallback();
        sendMessageAsync(r, cb);
        while(!cb.finished) {
            try {
                cb.wait();
            } catch (InterruptedException e) {}
        }
        if(cb.e != null) {
            if(cb.e instanceof SendFailedException) 
                throw (SendFailedException)(cb.e);
            else {
                Core.logger.log(this, "Got unexpected exception: "+
                                cb.e+" sending "+r, Logger.ERROR);
                SendFailedException e = 
                    new SendFailedException(peer.getAddress(), 
                                            peer.getIdentity(),
                                            "Unexpected exception "+
                                            cb.e, false);
                e.initCause(cb.e);
                throw e;
            }
        } else
            return cb.trailerStream;
    }
    
    class MyMessageSendCallback implements MessageSendCallback {
        boolean finished = false;
        Exception e = null;
        OutputStream trailerStream = null;
        public void succeeded() {
            finished = true;
            synchronized(this) {
                this.notify();
            }
        }
        
        public void thrown(Exception e) {
            this.e = e;
            finished = true;
            synchronized(this) {
                this.notify();
            }
        }
        
        public void setTrailerStream(OutputStream os) {
            trailerStream = os;
        }
    }
}

--- NEW FILE: PeerPacket.java ---
package freenet;
import freenet.session.Link;
import java.io.OutputStream;

class PeerPacket {
    PeerPacketMessage[] messages;
    byte[] encryptedData; // plaintext. Encrypt at send time.
    int sentBytes;
    
    PeerPacket(PeerPacketMessage[] msgs, Link l) 
        throws IllegalArgumentException {
        sentBytes = 0;
        messages = msgs;
        int totalLength = 0;
        for(int i=0;i<msgs.length;i++) {
            PeerPacketMessage m = msgs[i];
            totalLength += m.getLength();
            if((i != (msgs.length-1)) && m.hasTrailer())
                throw new IllegalArgumentException("trailers can only be attached to 
the LAST message!");
        }
        encryptedData = new byte[totalLength];
        int x = 0;
        for(int i=0;i<msgs.length;i++) {
            byte[] msgBytes = msgs[i].getContent();
            System.arraycopy(msgBytes, 0, encryptedData, x, msgBytes.length);
            x += msgBytes.length;
        }
        l.encryptBytes(encryptedData);
    }
    
    public int getLength() {
        return encryptedData.length;
    }
    
    /**
     * @return the actual packet bytes, after encryption
     */
    public byte[] getBytes() {
        return encryptedData;
    }
    
    /**
     * Notify clients of message send completion
     * @param finished whether the packet has finished sending. If false,
     * we are still sending it.
     * @param successfullySentBytes the number of bytes successfully sent.
     * Can be zero. Bytes after this number might have been sent, but we got
     * an error and can't be sure.
     */
    public void jobDone(boolean finished, int successfullySentBytes) {
        int msgStartOffset = 0;
        // Could unfold this a bit by keeping the offset in the array on the object
        PeerPacketMessage prev = null;
        for(int i=0;i<messages.length;i++) {
            if((!finished) && 
               msgStartOffset > successfullySentBytes) {
                prev = null;
                break;
            }
            if(msgStartOffset < sentBytes) {
                msgStartOffset += messages[i].getLength();
                prev = messages[i];
                continue;
            }
            // msgStartOffset >= sentBytes
            if(msgStartOffset == sentBytes) {
                // Last time sent a whole message and no more
                // Last message is dealt with
            } else {
                // msgStartOffset > sentBytes
                // Last message was not dealt with last time
                // prev != null because msgStartOffset > 0
                int prevLen = prev.getLength();
                int diff = msgStartOffset - sentBytes;
                if(finished || 
                   ((successfullySentBytes - msgStartOffset) > prevLen))
                    prev.notifyDone((successfullySentBytes - msgStartOffset) >
                                    prevLen);
            }
            prev = messages[i];
        }
        if(prev != null) {
            int prevLen = prev.getLength();
            boolean success = (successfullySentBytes == encryptedData.length);
            if(finished || success)
                prev.notifyDone(success);
        }
    }
}

--- NEW FILE: PeerPacketMessage.java ---
package freenet;
import freenet.Core;
import freenet.support.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

class PeerPacketMessage {
    final RawMessage raw;
    final byte[] content; // unencrypted - encrypt at PeerPacket level
    final MessageSendCallback cb;
    final Peer peer;
    static ByteArrayOutputStream bais = new ByteArrayOutputStream();
    
    public String toString() {
        return super.toString() + ":" + raw.toString() + ":" + cb;
    }
    
    public PeerPacketMessage(Peer p, RawMessage raw, MessageSendCallback cb) {
        this.peer = p;
        this.raw = raw;
        this.cb = cb;
        try {
            synchronized(bais) {
                bais.reset();
                raw.writeMessage(bais);
                bais.flush();
                this.content = bais.toByteArray();
            }
        } catch (IOException e) {
            Core.logger.log(this, "Impossible exception: "+e+
                            " writing message "+raw+","+cb+
                            " to BAIS", Logger.ERROR);
            throw new IllegalStateException("Impossible exception!: "+e);
        }
    }
    
    public byte[] getContent() {
        return content;
    }
    
    public int getLength() {
        return content.length;
    }
    
    public boolean hasTrailer() {
        return raw.trailingFieldLength > 0;
    }
    
    public long trailerLength() {
        return raw.trailingFieldLength;
    }
    
    public boolean isCloseMessage() {
        return raw.close;
    }
    
    /**
     * Notify the callback, after having sent (or failed to send) the message
     */
    public void notifyDone(boolean success) {
        if(cb == null) return;
        try {
            if(success) {
                cb.succeeded();
            } else {
                SendFailedException e =
                    new SendFailedException(peer.getAddress(), false);
                cb.thrown(e);
            }
        } catch (Throwable t) {
            Core.logger.log(this, toString()+".notifyDone() caught "+t,
                            t, Logger.ERROR);
        }
    }
}

Index: ConnectionHandler.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/ConnectionHandler.java,v
retrieving revision 1.148
retrieving revision 1.149
diff -u -r1.148 -r1.149
--- ConnectionHandler.java      8 Sep 2003 05:50:11 -0000       1.148
+++ ConnectionHandler.java      8 Sep 2003 17:03:02 -0000       1.149
@@ -897,12 +897,15 @@
                if (!status) {
                        //tell everybody they failed 
                        //this is where the PeerHandler will really help
+                       // Locking!
+                       MessageSend[] msgs = null;
                        synchronized(sendingQueue) {
-                               Iterator i = sendingQueue.iterator();
-                               while (i.hasNext()) {
-                                       MessageSend current = (MessageSend) i.next();
-                                       current.jobDone(0,false);
-                               }
+                               msgs = new MessageSend[sendingQueue.size()];
+                               sendingQueue.toArray(msgs);
+                       }
+                       for(int i=0;i<msgs.length;i++) {
+                               MessageSend current = msgs[i];
+                               current.jobDone(0,false);
                        }
                        //FIXME: we also need to iterate through the current packet
                        //not sure exactly how to do that.
@@ -2101,7 +2104,8 @@
                                                                        
ConnectionHandler.this+")", Logger.NORMAL);
                                if(cb != null) cb.thrown(sfe);
                        }
-                       
Core.diagnostics.occurrenceContinuous("messageSendQueueSize",sendingQueue.size());
+                       Core.diagnostics.occurrenceContinuous("messageSendQueueSize",
+                                                                                      
           sendingQueue.size());
                        //System.out.println("notified MessageSend");
                }
                
@@ -2805,8 +2809,11 @@
     public final LinkManager sessionType() {
         return peer.getLinkManager();
     }
-
-
+       
+       public final Link getLink() {
+               return link;
+       }
+       
     public final Address peerAddress() {
         return peer.getAddress();
     }

Index: MessageSendCallback.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/MessageSendCallback.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MessageSendCallback.java    12 Aug 2003 02:41:57 -0000      1.1
+++ MessageSendCallback.java    8 Sep 2003 17:03:03 -0000       1.2
@@ -1,6 +1,8 @@
 package freenet;
 
 import freenet.support.ExceptionCallback;
+import java.io.OutputStream;
 
 public interface MessageSendCallback extends ExceptionCallback {
+    void setTrailerStream(OutputStream os); // must be called BEFORE the terminal 
success() or thrown()
 }

Index: Version.java
===================================================================
RCS file: /cvsroot/freenet/freenet/src/freenet/Version.java,v
retrieving revision 1.371
retrieving revision 1.372
diff -u -r1.371 -r1.372
--- Version.java        8 Sep 2003 14:10:18 -0000       1.371
+++ Version.java        8 Sep 2003 17:03:03 -0000       1.372
@@ -18,7 +18,7 @@
     public static String protocolVersion = "1.46";
     
     /** The build number of the current revision */
-    public static final int buildNumber = 6178;
+    public static final int buildNumber = 6179;
     // 6028: may 3; ARK retrieval fix
 
     public static final int ignoreBuildsAfter = 6500;

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to