Author: toad
Date: 2005-11-15 21:10:36 +0000 (Tue, 15 Nov 2005)
New Revision: 7543

Added:
   trunk/freenet/src/freenet/io/xfer/AbortedException.java
   
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java
Modified:
   trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
   trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
   trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java
   trunk/freenet/src/freenet/crypt/Yarrow.java
   trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
   trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java
   trunk/freenet/src/freenet/node/FNPPacketMangler.java
   trunk/freenet/src/freenet/node/InsertHandler.java
   trunk/freenet/src/freenet/node/KeyTracker.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/Version.java
   trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
   
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
Log:
180:
LOTS of bug fixes and bug paranoia.
Inserts may complete now, but they often fail due to timeouts, even locally.

Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-11-12 22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java     
2005-11-15 21:10:36 UTC (rev 7543)
@@ -47,8 +47,8 @@
        static final int MAX_SPLITFILE_BLOCKS_PER_SEGMENT = 1024;
        static final int MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 1536;
        // ~ 70kB/sec encode, 16MB segments
-       static final int SPLITFILE_BLOCKS_PER_SEGMENT = 512;
-       static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 768;
+       static final int SPLITFILE_BLOCKS_PER_SEGMENT = 128;
+       static final int SPLITFILE_CHECK_BLOCKS_PER_SEGMENT = 192;


        public HighLevelSimpleClientImpl(SimpleLowLevelClient client, 
ArchiveManager mgr, BucketFactory bf, RandomSource r) {

Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-12 
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-15 
21:10:36 UTC (rev 7543)
@@ -271,7 +271,7 @@
                Runtime.getRuntime().gc();
                Runtime.getRuntime().runFinalization();
                long memUsedAtStart = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
-               Logger.minor(this, "Memory in use at start: "+memUsedAtStart);
+               Logger.minor(this, "Memory in use at start: "+memUsedAtStart+" 
max="+Runtime.getRuntime().maxMemory());
                System.err.println("************* Encoding " + 
dataBlockStatus.length
                                + " -> " + checkBlockStatus.length + " 
*************");
                Logger.minor(this, "Doing encode: " + dataBlockStatus.length

Modified: trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java
===================================================================
--- trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java   2005-11-12 
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/crypt/DiffieHellmanContext.java   2005-11-15 
21:10:36 UTC (rev 7543)
@@ -88,7 +88,11 @@

     public synchronized void setOtherSideExponential(NativeBigInteger a) {
         lastUsedTime = System.currentTimeMillis();
-        if(peerExponential != null) throw new IllegalStateException("Assigned 
other side exponential twice");
+        if(peerExponential != null) {
+               if(!peerExponential.equals(a))
+                       throw new IllegalStateException("Assigned other side 
exponential twice");
+               else return;
+        }
         if(a == null) throw new NullPointerException();
         peerExponential = a;
     }

Modified: trunk/freenet/src/freenet/crypt/Yarrow.java
===================================================================
--- trunk/freenet/src/freenet/crypt/Yarrow.java 2005-11-12 22:23:39 UTC (rev 
7542)
+++ trunk/freenet/src/freenet/crypt/Yarrow.java 2005-11-15 21:10:36 UTC (rev 
7543)
@@ -535,6 +535,7 @@
        }

        private void fast_pool_reseed() {
+               long startTime = System.currentTimeMillis();
                byte[] v0 = fast_pool.digest();
                byte[] vi = v0;

@@ -550,6 +551,9 @@
                rekey(tmp);
                Arrays.fill(v0, (byte) 0); // blank out for security
                fast_entropy = 0;
+               long endTime = System.currentTimeMillis();
+               if(endTime - startTime > 5000)
+                       Logger.normal(this, "Fast pool reseed took 
"+(endTime-startTime)+"ms");
        }

        private void slow_pool_reseed() {

Modified: trunk/freenet/src/freenet/io/comm/UdpSocketManager.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2005-11-12 
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/comm/UdpSocketManager.java     2005-11-15 
21:10:36 UTC (rev 7543)
@@ -50,6 +50,7 @@
                super.start();
                Thread checker = new Thread(new USMChecker());
                checker.setDaemon(true);
+               checker.setPriority(Thread.MAX_PRIORITY);
                checker.start();
        }

@@ -88,44 +89,46 @@

        public void run() { // Listen for packets
                try {
-               while (_active) {
-                   try {
-                       DatagramPacket packet = getPacket();
-                       // Check for timedout _filters
-                       removeTimedOutFilters();
-                       // Check for matched _filters
-                       if(packet != null) {
-                           Peer peer = new Peer(packet.getAddress(), 
packet.getPort());
-                           byte[] data = packet.getData();
-                           int offset = packet.getOffset();
-                           int length = packet.getLength();
-                           if(lowLevelFilter != null) {
-                               try {
-                                   lowLevelFilter.process(data, offset, 
length, peer);
-                                   Logger.minor(this, "Successfully handled 
packet length "+length);
-                               } catch (Throwable t) {
-                                   Logger.error(this, "Caught "+t+" from 
"+lowLevelFilter, t);
-                               }
-                           }
-                           else {
-                               // Create a bogus context since no filter
-                               Message m = decodePacket(data, offset, length, 
new DummyPeerContext(peer));
-                               if(m != null)
-                                   checkFilters(m);
-                           }
-                       } else
-                               Logger.minor(this, "Null packet");
-                   } catch (Throwable t) {
-                       Logger.error(this, "Caught "+t, t);
-                   }
-               }
+                       while (/*_active*/true) {
+                               try {
+                                       DatagramPacket packet = getPacket();
+                                       // Check for timedout _filters
+                                       removeTimedOutFilters();
+                                       // Check for matched _filters
+                                       if (packet != null) {
+                                               Peer peer = new 
Peer(packet.getAddress(), packet.getPort());
+                                               byte[] data = packet.getData();
+                                               int offset = packet.getOffset();
+                                               int length = packet.getLength();
+                                               if (lowLevelFilter != null) {
+                                                       try {
+                                                               
lowLevelFilter.process(data, offset, length, peer);
+                                                               
Logger.minor(this,
+                                                                               
"Successfully handled packet length " + length);
+                                                       } catch (Throwable t) {
+                                                               
Logger.error(this, "Caught " + t + " from "
+                                                                               
+ lowLevelFilter, t);
+                                                       }
+                                               } else {
+                                                       // Create a bogus 
context since no filter
+                                                       Message m = 
decodePacket(data, offset, length,
+                                                                       new 
DummyPeerContext(peer));
+                                                       if (m != null)
+                                                               checkFilters(m);
+                                               }
+                                       } else
+                                               Logger.minor(this, "Null 
packet");
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught " + t, t);
+                               }
+                       }
                } finally {
-               Logger.error(this, "run() exiting");
-               synchronized (this) {
-                       _isDone = true;
-                       notifyAll();
+                       Logger.error(this, "run() exiting");
+                       synchronized (this) {
+                               _isDone = true;
+                               notifyAll();
+                       }
                }
-               }
        }

        /**

Added: trunk/freenet/src/freenet/io/xfer/AbortedException.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/AbortedException.java     2005-11-12 
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/AbortedException.java     2005-11-15 
21:10:36 UTC (rev 7543)
@@ -0,0 +1,13 @@
+package freenet.io.xfer;
+
+/**
+ * Thrown when a transfer is aborted, and caller tries to do something on PRB,
+ * in order to avoid some races.
+ */
+public class AbortedException extends Exception {
+
+       public AbortedException(String msg) {
+               super(msg);
+       }
+
+}

Modified: trunk/freenet/src/freenet/io/xfer/BlockReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2005-11-12 
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/BlockReceiver.java        2005-11-15 
21:10:36 UTC (rev 7543)
@@ -135,6 +135,10 @@
                return _prb.getBlock();
                } catch(NotConnectedException e) {
                    throw new 
RetrievalException(RetrievalException.SENDER_DISCONNECTED);
+               } catch(AbortedException e) {
+                       // We didn't cause it?!
+                       Logger.error(this, "Caught in receive - probably a bug 
as receive sets it: "+e);
+                       throw new 
RetrievalException(RetrievalException.UNKNOWN);
                }
        }
 }

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-12 
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2005-11-15 
21:10:36 UTC (rev 7543)
@@ -56,7 +56,12 @@
                _destination = destination;
                _uid = uid;
                _prb = source;
-               _sentPackets = new BitArray(_prb.getNumPackets());
+               try {
+                       _sentPackets = new BitArray(_prb.getNumPackets());
+               } catch (AbortedException e) {
+                       Logger.error(this, "Aborted during setup");
+                       // Will throw on running
+               }
        }

        public boolean send() {
@@ -100,11 +105,15 @@
                                                } catch (NotConnectedException 
e) {
                                                    Logger.normal(this, 
"Terminating send: "+e);
                                                    _sendComplete = true;
+                                               } catch (AbortedException e) {
+                                                       Logger.normal(this, 
"Terminating send due to abort: "+e);
+                                                       _sendComplete = true;
                                                }
                                }
                        }
                };

+               try {
                _unsent = _prb.addListener(new 
PartiallyReceivedBlock.PacketReceivedListener() {;

                        public void packetReceived(int packetNo) {
@@ -164,7 +173,12 @@
                            // Terminated abnormally
                            return false;
                        }
-               }               
+               }
+               } catch (AbortedException e) {
+                       // Terminate
+                       _sendComplete = true;
+                       return false;
+               }
        }

        public int getNumSent() {

Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java       
2005-11-12 22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBlock.java       
2005-11-15 21:10:36 UTC (rev 7543)
@@ -61,9 +61,9 @@
                _packetSize = packetSize;
        }

-       public synchronized LinkedList addListener(PacketReceivedListener 
listener) {
+       public synchronized LinkedList addListener(PacketReceivedListener 
listener) throws AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("Adding listener to aborted 
PRB");
+                       throw new AbortedException("Adding listener to aborted 
PRB");
                }
                _packetReceivedListeners.add(listener);
                LinkedList ret = new LinkedList();
@@ -75,30 +75,30 @@
                return ret;
        }

-       public synchronized boolean isReceived(int packetNo) {
+       public synchronized boolean isReceived(int packetNo) throws 
AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("PRB is aborted");
+                       throw new AbortedException("PRB is aborted");
                }
                return _received[packetNo];
        }

-       public synchronized int getNumPackets() {
+       public synchronized int getNumPackets() throws AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("PRB is aborted");
+                       throw new AbortedException("PRB is aborted");
                }
                return _packets;
        }

-       public synchronized int getPacketSize() {
+       public synchronized int getPacketSize() throws AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("PRB is aborted");
+                       throw new AbortedException("PRB is aborted");
                }
                return _packetSize;
        }

-       public synchronized void addPacket(int position, Buffer packet) {
+       public synchronized void addPacket(int position, Buffer packet) throws 
AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("PRB is aborted");
+                       throw new AbortedException("PRB is aborted");
                }
                if (packet.getLength() != _packetSize) {
                        throw new RuntimeException("New packet size 
"+packet.getLength()+" but expecting packet of size "+_packetSize);
@@ -114,16 +114,16 @@
                }
        }

-       public boolean allReceived() {
+       public boolean allReceived() throws AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("PRB is aborted");
+                       throw new AbortedException("PRB is aborted");
                }
                return _receivedCount == _packets;
        }

-       public byte[] getBlock() {
+       public byte[] getBlock() throws AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("PRB is aborted");
+                       throw new AbortedException("PRB is aborted");
                }
                if (!allReceived()) {
                        throw new RuntimeException("Tried to get block before 
all packets received");
@@ -131,9 +131,9 @@
                return _data;
        }

-       public Buffer getPacket(int x) {
+       public Buffer getPacket(int x) throws AbortedException {
                if (_aborted) {
-                       throw new RuntimeException("PRB is aborted");
+                       throw new AbortedException("PRB is aborted");
                }
                return new Buffer(_data, x * _packetSize, _packetSize);
        }

Modified: trunk/freenet/src/freenet/node/FNPPacketMangler.java
===================================================================
--- trunk/freenet/src/freenet/node/FNPPacketMangler.java        2005-11-12 
22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/node/FNPPacketMangler.java        2005-11-15 
21:10:36 UTC (rev 7543)
@@ -338,7 +338,7 @@
         output[1] = (byte) negType;
         output[2] = (byte) phase;
         System.arraycopy(data, 0, output, 3, data.length);
-        Logger.minor(this, "Sending auth packet to "+replyTo+" - 
version="+version+" negType="+negType+" phase="+phase+" 
data.length="+data.length+" for "+pn);
+        Logger.minor(this, "Sending auth packet ("+version+","+negType+" to 
"+replyTo+" - version="+version+" negType="+negType+" phase="+phase+" 
data.length="+data.length+" for "+pn);
         sendAuthPacket(output, pn, replyTo);
     }

@@ -369,8 +369,8 @@
         pcfb.blockEncipher(output, 0, output.length);
         System.arraycopy(output, 0, data, hash.length+iv.length+2, 
output.length);
         usm.sendPacket(data, replyTo);
-        Logger.minor(this, "Sending auth packet to "+replyTo+" - size 
"+data.length+" data length: "+output.length);
-    }
+        Logger.minor(this, "Sending auth packet (long) to "+replyTo+" - size 
"+data.length+" data length: "+output.length);
+     }

     /**
      * @param i
@@ -692,18 +692,19 @@

         int ackCount = decrypted[ptr++] & 0xff;
         Logger.minor(this, "Acks: "+ackCount);
-        
+
+        int[] acks = new int[ackCount];
         for(int i=0;i<ackCount;i++) {
             int offset = decrypted[ptr++] & 0xff;
             if(ptr > decrypted.length) {
                 Logger.error(this, "Packet not long enough at byte "+ptr+" on 
"+tracker);
                 return;
             }
-            int realSeqNo = referenceSeqNumber - offset;
-            Logger.minor(this, "ACK: "+realSeqNo);
-            tracker.acknowledgedPacket(realSeqNo);
+            acks[i] = referenceSeqNumber - offset;
         }

+        tracker.acknowledgedPackets(acks);
+        
         int retransmitCount = decrypted[ptr++] & 0xff;
         Logger.minor(this, "Retransmit requests: "+retransmitCount);

@@ -1062,6 +1063,10 @@

         // We do not support forgotten packets at present

+        int[] acks = tracker.grabAcks();
+        int[] resendRequests = tracker.grabResendRequests();
+        int[] ackRequests = tracker.grabAckRequests();
+        
         // Allocate a sequence number
         int seqNumber;
         if(packetNumber > 0)
@@ -1076,10 +1081,6 @@

         Logger.minor(this, "Sequence number (sending): "+seqNumber+" 
("+packetNumber+") to "+tracker.pn.getPeer());

-        int[] acks = tracker.grabAcks();
-        int[] resendRequests = tracker.grabResendRequests();
-        int[] ackRequests = tracker.grabAckRequests();
-        
         int packetLength = acks.length + resendRequests.length + 
ackRequests.length + 4 + 1 + length + 4 + 4 + RANDOM_BYTES_LENGTH;
         if(packetNumber == -1) packetLength += 4;
         else packetLength++;
@@ -1096,6 +1097,7 @@
         plaintext[ptr++] = (byte)seqNumber;

         node.random.nextBytes(randomJunk);
+        Logger.minor(this, "Got random junk");
         System.arraycopy(randomJunk, 0, plaintext, ptr, RANDOM_BYTES_LENGTH);
         ptr += RANDOM_BYTES_LENGTH;


Modified: trunk/freenet/src/freenet/node/InsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/InsertHandler.java   2005-11-12 22:23:39 UTC 
(rev 7542)
+++ trunk/freenet/src/freenet/node/InsertHandler.java   2005-11-15 21:10:36 UTC 
(rev 7543)
@@ -6,6 +6,7 @@
 import freenet.io.comm.MessageFilter;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.RetrievalException;
+import freenet.io.xfer.AbortedException;
 import freenet.io.xfer.BlockReceiver;
 import freenet.io.xfer.PartiallyReceivedBlock;
 import freenet.keys.CHKBlock;
@@ -195,14 +196,17 @@
     private void finish() {
         Message toSend = null;
         synchronized(this) { // REDFLAG do not use synch(this) for any other 
purpose!
-            if(!canCommit) return;
-            if(!prb.allReceived()) return;
             try {
+                if(!canCommit) return;
+                if(!prb.allReceived()) return;
                 CHKBlock block = new CHKBlock(prb.getBlock(), headers, key);
                 node.store(block);
             } catch (CHKVerifyException e) {
                 Logger.error(this, "Verify failed in InsertHandler: "+e+" - 
headers: "+HexUtil.bytesToHex(headers), e);
                 toSend = DMT.createFNPDataInsertRejected(uid, 
DMT.DATA_INSERT_REJECTED_VERIFY_FAILED);
+            } catch (AbortedException e) {
+               Logger.error(this, "Receive failed: "+e);
+               // Receiver thread will handle below
             }
         }
         if(toSend != null) {

Modified: trunk/freenet/src/freenet/node/KeyTracker.java
===================================================================
--- trunk/freenet/src/freenet/node/KeyTracker.java      2005-11-12 22:23:39 UTC 
(rev 7542)
+++ trunk/freenet/src/freenet/node/KeyTracker.java      2005-11-15 21:10:36 UTC 
(rev 7543)
@@ -13,6 +13,7 @@
 import freenet.support.LimitedRangeIntByteArrayMapElement;
 import freenet.support.Logger;
 import freenet.support.UpdatableSortedLinkedListItem;
+import freenet.support.UpdatableSortedLinkedListKilledException;
 import freenet.support.UpdatableSortedLinkedListWithForeignIndex;
 import freenet.support.WouldBlockException;
 import freenet.support.DoublyLinkedList.Item;
@@ -208,7 +209,7 @@
         long activeTime;
         final Integer packetNumberAsInteger;

-        void sent() {
+        void sent() throws UpdatableSortedLinkedListKilledException {
             long now = System.currentTimeMillis();
             activeTime = now + 500;
             urgentTime = activeTime + 200;
@@ -285,7 +286,7 @@
             super(packetNumber);
         }

-        void sent() {
+        void sent() throws UpdatableSortedLinkedListKilledException {
             synchronized(resendRequestQueue) {
                 super.sent();
                 resendRequestQueue.update(this);
@@ -307,7 +308,7 @@
             }
         }

-        void sent() {
+        void sent() throws UpdatableSortedLinkedListKilledException {
             synchronized(ackRequestQueue) {
                 super.sent();
                 ackRequestQueue.update(this);
@@ -319,7 +320,8 @@
      * Called when we receive a packet.
      * @param seqNumber The packet's serial number.
      */
-    public synchronized void receivedPacket(int seqNumber) {
+    public void receivedPacket(int seqNumber) {
+       Logger.minor(this, "Received packet "+seqNumber);
         try {
                        pn.receivedPacket();
                } catch (NotConnectedException e) {
@@ -327,39 +329,49 @@
                        return;
                }
         if(seqNumber == -1) return;
+        // FIXME delete this log statement
+        Logger.minor(this, "Still received packet: "+seqNumber);
+        // Received packet
         receivedPacketNumber(seqNumber);
-        if(packetNumbersReceived.contains(seqNumber)) {
-            // They resent it
-            // Lets re-ack it.
-            queueAck(seqNumber);
-        } else {
-            receivedPacketNumber(seqNumber);
-            queueAck(seqNumber);
-        }
+        // Ack it even if it is a resend
+        queueAck(seqNumber);
     }

-    protected synchronized void receivedPacketNumber(int seqNumber) {
+    protected void receivedPacketNumber(int seqNumber) {
+       Logger.minor(this, "Handling received packet number "+seqNumber);
         queueResendRequests(seqNumber);
         packetNumbersReceived.got(seqNumber);
-        removeResendRequest(seqNumber);
-        highestSeenIncomingSerialNumber = 
Math.max(highestSeenIncomingSerialNumber, seqNumber);
+        try {
+                       removeResendRequest(seqNumber);
+               } catch (UpdatableSortedLinkedListKilledException e) {
+                       // Ignore, not our problem
+               }
+        synchronized(this) {
+               highestSeenIncomingSerialNumber = 
Math.max(highestSeenIncomingSerialNumber, seqNumber);
+        }
+        Logger.minor(this, "Handled received packet number "+seqNumber);
     }

     /**
      * Remove a resend request from the queue.
      * @param seqNumber
+     * @throws UpdatableSortedLinkedListKilledException 
      */
-    private void removeResendRequest(int seqNumber) {
-        resendRequestQueue.removeByKey(new Integer(seqNumber));
+    private void removeResendRequest(int seqNumber) throws 
UpdatableSortedLinkedListKilledException {
+       resendRequestQueue.removeByKey(new Integer(seqNumber));
     }

     /**
      * Add some resend requests if necessary.
      * @param seqNumber The number of the packet we just received.
      */
-    private synchronized void queueResendRequests(int seqNumber) {
-        int max = packetNumbersReceived.highest();
+    private void queueResendRequests(int seqNumber) {
+        int max;
+        synchronized(this) {
+               max = packetNumbersReceived.highest();
+        }
         if(seqNumber > max) {
+               try {
             if(max != -1 && seqNumber - max > 1) {
                 Logger.minor(this, "Queueing resends from "+max+" to 
"+seqNumber);
                 // Missed some packets out
@@ -367,6 +379,9 @@
                     queueResendRequest(i);
                 }
             }
+               } catch (UpdatableSortedLinkedListKilledException e) {
+                       // Ignore (we are decoding packet, not sending one)
+               }
         }
     }

@@ -374,23 +389,27 @@
      * Queue a resend request
      * @param packetNumber the packet serial number to queue a
      * resend request for
+     * @throws UpdatableSortedLinkedListKilledException 
      */
-    private void queueResendRequest(int packetNumber) {
-        if(queuedResendRequest(packetNumber)) {
-            Logger.minor(this, "Not queueing resend request for 
"+packetNumber+" - already queued");
-            return;
-        }
-        Logger.minor(this, "Queueing resend request for "+packetNumber);
-        QueuedResendRequest qrr = new QueuedResendRequest(packetNumber);
-        resendRequestQueue.add(qrr);
+    private void queueResendRequest(int packetNumber) throws 
UpdatableSortedLinkedListKilledException {
+       synchronized(resendRequestQueue) {
+               if(queuedResendRequest(packetNumber)) {
+                       Logger.minor(this, "Not queueing resend request for 
"+packetNumber+" - already queued");
+                       return;
+               }
+               Logger.minor(this, "Queueing resend request for "+packetNumber);
+               QueuedResendRequest qrr = new QueuedResendRequest(packetNumber);
+               resendRequestQueue.add(qrr);
+       }
     }

     /**
      * Queue an ack request
      * @param packetNumber the packet serial number to queue a
      * resend request for
+     * @throws UpdatableSortedLinkedListKilledException 
      */
-    private void queueAckRequest(int packetNumber) {
+    private void queueAckRequest(int packetNumber) throws 
UpdatableSortedLinkedListKilledException {
         synchronized(ackRequestQueue) {
             if(queuedAckRequest(packetNumber)) {
                 Logger.minor(this, "Not queueing ack request for 
"+packetNumber+" - already queued");
@@ -417,26 +436,63 @@
     }

     /**
+     * Called when we have received several packet acknowledgements.
+     */
+    public void acknowledgedPackets(int[] seqNos) {
+       AsyncMessageCallback[][] callbacks = new 
AsyncMessageCallback[seqNos.length][];
+               for(int i=0;i<seqNos.length;i++) {
+                       int realSeqNo = seqNos[i];
+               Logger.minor(this, "Acknowledged packet (synced): "+realSeqNo);
+            try {
+                               removeAckRequest(realSeqNo);
+                       } catch (UpdatableSortedLinkedListKilledException e) {
+                               // Ignore, we are processing an incoming packet
+                       }
+            Logger.minor(this, "Removed ack request");
+            callbacks[i] = sentPacketsContents.getCallbacks(realSeqNo);
+            sentPacketsContents.remove(realSeqNo);
+               }
+       int cbCount = 0;
+       for(int i=0;i<callbacks.length;i++) {
+               AsyncMessageCallback[] cbs = callbacks[i];
+               if(cbs != null) {
+                       for(int j=0;j<cbs.length;j++) {
+                               cbs[j].acknowledged();
+                               cbCount++;
+                       }
+               }
+       }
+       if(cbCount > 0)
+               Logger.minor(this, "Executed "+cbCount+" callbacks");
+    }
+    
+    /**
      * Called when we have received a packet acknowledgement.
      * @param realSeqNo
      */
     public void acknowledgedPacket(int realSeqNo) {
         AsyncMessageCallback[] callbacks;
-        synchronized(this) {
-            removeAckRequest(realSeqNo);
-            callbacks = sentPacketsContents.getCallbacks(realSeqNo);
-            sentPacketsContents.remove(realSeqNo);
-        }
+               Logger.minor(this, "Acknowledged packet (synced): "+realSeqNo);
+        try {
+                       removeAckRequest(realSeqNo);
+               } catch (UpdatableSortedLinkedListKilledException e) {
+                       // Ignore, we are processing an incoming packet
+               }
+        Logger.minor(this, "Removed ack request");
+        callbacks = sentPacketsContents.getCallbacks(realSeqNo);
+        sentPacketsContents.remove(realSeqNo);
         if(callbacks != null) {
             for(int i=0;i<callbacks.length;i++)
                 callbacks[i].acknowledged();
+            Logger.minor(this, "Executed "+callbacks.length+" callbacks");
         }
     }

     /**
      * Remove an ack request from the queue by packet number.
+     * @throws UpdatableSortedLinkedListKilledException 
      */
-    private void removeAckRequest(int seqNo) {
+    private void removeAckRequest(int seqNo) throws 
UpdatableSortedLinkedListKilledException {
         ackRequestQueue.removeByKey(new Integer(seqNo));
     }

@@ -453,13 +509,15 @@
             }
             pn.node.ps.queuedResendPacket();
         } else {
-            String msg = "Asking me to resend packet "+seqNumber+
-                       " which we haven't sent yet or which they have already 
acked (next="+nextPacketNumber+")";
-            // Can have a race condition
-            if(seqNumber < nextPacketNumber && seqNumber > nextPacketNumber-64)
-                Logger.minor(this, msg);
-            else
-                Logger.error(this, msg);
+               synchronized(this) {
+                       String msg = "Asking me to resend packet "+seqNumber+
+                               " which we haven't sent yet or which they have 
already acked (next="+nextPacketNumber+")";
+                       // Can have a race condition
+                       if(seqNumber < nextPacketNumber && seqNumber > 
nextPacketNumber-64)
+                               Logger.minor(this, msg);
+                       else
+                               Logger.error(this, msg);
+               }
         }
     }

@@ -478,7 +536,11 @@
                 queueAck(packetNumber);
             } else {
                 // We have not received it, so get them to resend it
-                queueResendRequest(packetNumber);
+                try {
+                                       queueResendRequest(packetNumber);
+                               } catch 
(UpdatableSortedLinkedListKilledException e) {
+                                       // Ignore, we are decoding, not sending.
+                               }
                 synchronized(this) {
                     highestSeenIncomingSerialNumber = 
Math.max(highestSeenIncomingSerialNumber, packetNumber);
                 }
@@ -514,7 +576,11 @@
         } else {
             Logger.error(this, "Destination forgot packet: "+seqNumber);
         }
-        removeResendRequest(seqNumber);
+        try {
+                       removeResendRequest(seqNumber);
+               } catch (UpdatableSortedLinkedListKilledException e) {
+                       // Ignore
+               }
     }

     /**
@@ -584,12 +650,14 @@
     /**
      * Grab all the currently queued resend requests.
      * @return An array of the packet numbers of all the packets we want to be 
resent.
+     * @throws NotConnectedException If the peer is no longer connected.
      */
-    public int[] grabResendRequests() {
+    public int[] grabResendRequests() throws NotConnectedException {
         UpdatableSortedLinkedListItem[] items;
         int[] packetNumbers;
         int realLength;
         long now = System.currentTimeMillis();
+        try {
         synchronized(resendRequestQueue) {
             items = resendRequestQueue.toArray();
             int length = items.length;
@@ -597,6 +665,11 @@
             realLength = 0;
             for(int i=0;i<length;i++) {
                 QueuedResendRequest qrr = (QueuedResendRequest)items[i];
+                if(packetNumbersReceived.contains(qrr.packetNumber)) {
+                       Logger.minor(this, "Have already seen 
"+qrr.packetNumber+", removing from resend list");
+                       resendRequestQueue.remove(qrr);
+                       continue;
+                }
                 if(qrr.activeTime <= now) {
                     packetNumbers[realLength++] = qrr.packetNumber;
                     Logger.minor(this, "Grabbing resend request: 
"+qrr.packetNumber+" from "+this);
@@ -606,40 +679,45 @@
                 }
             }
         }
+        } catch (UpdatableSortedLinkedListKilledException e) {
+               throw new NotConnectedException();
+        }
         int[] trimmedPacketNumbers = new int[realLength];
         System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0, 
realLength);
         return trimmedPacketNumbers;
     }

-    public int[] grabAckRequests() {
+    public int[] grabAckRequests() throws NotConnectedException {
         UpdatableSortedLinkedListItem[] items;
         int[] packetNumbers;
         int realLength;
         long now = System.currentTimeMillis();
-        synchronized(this) {
-            synchronized(ackRequestQueue) {
-                items = ackRequestQueue.toArray();
-                int length = items.length;
-                packetNumbers = new int[length];
-                realLength = 0;
-                for(int i=0;i<length;i++) {
-                    QueuedAckRequest qrr = (QueuedAckRequest)items[i];
-                    int packetNumber = qrr.packetNumber;
-                    if(qrr.activeTime <= now) {
-                        if(sentPacketsContents.get(packetNumber) == null) {
-                            Logger.minor(this, "Asking to ack packet which has 
already been acked: "+packetNumber+" on "+this+".grabAckRequests");
-                            ackRequestQueue.remove(qrr);
-                            continue;
-                        }
-                        packetNumbers[realLength++] = packetNumber;
-                        Logger.minor(this, "Grabbing ack request 
"+packetNumber+" from "+this);
-                        qrr.sent();
-                    } else {
-                        Logger.minor(this, "Ignoring ack request 
"+packetNumber+" - will become active in "+(qrr.activeTime-now)+" ms on "+this);
+        try {
+        synchronized(ackRequestQueue) {
+            items = ackRequestQueue.toArray();
+            int length = items.length;
+            packetNumbers = new int[length];
+            realLength = 0;
+            for(int i=0;i<length;i++) {
+                QueuedAckRequest qr = (QueuedAckRequest)items[i];
+                int packetNumber = qr.packetNumber;
+                if(qr.activeTime <= now) {
+                    if(sentPacketsContents.get(packetNumber) == null) {
+                        Logger.minor(this, "Asking to ack packet which has 
already been acked: "+packetNumber+" on "+this+".grabAckRequests");
+                        ackRequestQueue.remove(qr);
+                        continue;
                     }
+                    packetNumbers[realLength++] = packetNumber;
+                    Logger.minor(this, "Grabbing ack request "+packetNumber+" 
from "+this);
+                    qr.sent();
+                } else {
+                    Logger.minor(this, "Ignoring ack request "+packetNumber+" 
- will become active in "+(qr.activeTime-now)+" ms on "+this);
                 }
             }
         }
+        } catch (UpdatableSortedLinkedListKilledException e) {
+               throw new NotConnectedException();
+        }
         int[] trimmedPacketNumbers = new int[realLength];
         System.arraycopy(packetNumbers, 0, trimmedPacketNumbers, 0, 
realLength);
         return trimmedPacketNumbers;
@@ -685,8 +763,9 @@
      * Report a packet has been sent
      * @param data The data we have just sent (payload only, decrypted). 
      * @param seqNumber The packet number.
+     * @throws NotConnectedException 
      */
-    public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[] 
callbacks) {
+    public void sentPacket(byte[] data, int seqNumber, AsyncMessageCallback[] 
callbacks) throws NotConnectedException {
         if(callbacks != null) {
             for(int i=0;i<callbacks.length;i++) {
                 if(callbacks[i] == null)
@@ -694,7 +773,11 @@
             }
         }
         sentPacketsContents.add(seqNumber, data, callbacks);
-        queueAckRequest(seqNumber);
+        try {
+                       queueAckRequest(seqNumber);
+               } catch (UpdatableSortedLinkedListKilledException e) {
+                       throw new NotConnectedException();
+               }
     }

     public void completelyDeprecated(KeyTracker newTracker) {
@@ -741,8 +824,8 @@
         synchronized(ackQueue) {
             ackQueue.clear();
         }
-        resendRequestQueue.clear();
-        ackRequestQueue.clear();
+        resendRequestQueue.kill();
+        ackRequestQueue.kill();
         synchronized(packetsToResend) {
             packetsToResend.clear();
         }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java    2005-11-12 22:23:39 UTC (rev 
7542)
+++ trunk/freenet/src/freenet/node/Node.java    2005-11-15 21:10:36 UTC (rev 
7543)
@@ -37,6 +37,7 @@
 import freenet.io.comm.Peer;
 import freenet.io.comm.PeerParseException;
 import freenet.io.comm.UdpSocketManager;
+import freenet.io.xfer.AbortedException;
 import freenet.io.xfer.PartiallyReceivedBlock;
 import freenet.keys.CHKBlock;
 import freenet.keys.CHKVerifyException;
@@ -377,7 +378,10 @@
             } catch (CHKVerifyException e) {
                 Logger.error(this, "Does not verify: "+e, e);
                 throw new 
LowLevelGetException(LowLevelGetException.DECODE_FAILED);                
-            }
+            } catch (AbortedException e) {
+               Logger.error(this, "Impossible: "+e, e);
+               throw new 
LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
+                       }
         } else {
                switch(rs.getStatus()) {
                case RequestSender.NOT_FINISHED:

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2005-11-12 22:23:39 UTC 
(rev 7542)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2005-11-15 21:10:36 UTC 
(rev 7543)
@@ -883,28 +883,34 @@
      * Requeue ResendPacketItem[]s if they are not sent.
      * @param resendItems
      */
-    public synchronized void requeueResendItems(ResendPacketItem[] 
resendItems) {
+    public void requeueResendItems(ResendPacketItem[] resendItems) {
+       KeyTracker cur, prev, unv;
+       synchronized(this) {
+               cur = currentTracker;
+               prev = previousTracker;
+               unv = unverifiedTracker;
+       }
         for(int i=0;i<resendItems.length;i++) {
             ResendPacketItem item = resendItems[i];
             if(item.pn != this)
                 throw new IllegalArgumentException("item.pn != this!");
-            KeyTracker kt = currentTracker;
+            KeyTracker kt = cur;
             if(kt != null && item.kt == kt) {
                 kt.resendPacket(item.packetNumber);
                 continue;
             }
-            kt = previousTracker;
+            kt = prev;
             if(kt != null && item.kt == kt) {
                 kt.resendPacket(item.packetNumber);
                 continue;
             }
-            kt = unverifiedTracker;
+            kt = unv;
             if(kt != null && item.kt == kt) {
                 kt.resendPacket(item.packetNumber);
                 continue;
             }
             // Doesn't match any of these, need to resend the data
-            kt = currentTracker == null ? unverifiedTracker : currentTracker;
+            kt = cur == null ? unv : cur;
             if(kt == null) {
                 Logger.error(this, "No tracker to resend packet 
"+item.packetNumber+" on");
                 continue;

Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-12 22:23:39 UTC (rev 
7542)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-15 21:10:36 UTC (rev 
7543)
@@ -20,10 +20,10 @@
        public static final String protocolVersion = "1.0";

        /** The build number of the current revision */
-       public static final int buildNumber = 179;
+       public static final int buildNumber = 180;

        /** Oldest build of Fred we will talk to */
-       public static final int lastGoodBuild = 178;
+       public static final int lastGoodBuild = 180;

        /** The highest reported build of fred */
        public static int highestSeenBuild = buildNumber;

Modified: trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java
===================================================================
--- trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java    
2005-11-12 22:23:39 UTC (rev 7542)
+++ trunk/freenet/src/freenet/support/UpdatableSortedLinkedList.java    
2005-11-15 21:10:36 UTC (rev 7543)
@@ -11,13 +11,16 @@
  */
 public class UpdatableSortedLinkedList {

+       private boolean killed = false;
+       
     public UpdatableSortedLinkedList() {
         list = new DoublyLinkedListImpl();
     }

     private final DoublyLinkedList list;

-    public synchronized void add(UpdatableSortedLinkedListItem i) {
+    public synchronized void add(UpdatableSortedLinkedListItem i) throws 
UpdatableSortedLinkedListKilledException {
+       if(killed) throw new UpdatableSortedLinkedListKilledException();
         Logger.minor(this, "Add("+i+") on "+this);
         if(list.isEmpty()) {
             list.push(i);
@@ -58,34 +61,32 @@
        int statedLength = list.size();
        int realLength = 0;
        sb.setLength(0);
-       int x = 0;
        for(Enumeration e = list.elements();e.hasMoreElements();) {
                UpdatableSortedLinkedListItem i = 
(UpdatableSortedLinkedListItem) e.nextElement();
-               sb.append(x);
-               sb.append("=");
-               sb.append(i);
-               sb.append('\n');
+               // Sanity check for infinite looping 
+               if(realLength > 100*1000)
+                       Logger.normal(this, "["+realLength+"] = "+i+" 
(prev="+i.getPrev()+")");
                realLength++;
        }
        if(statedLength != realLength) {
                String err = "statedLength = "+statedLength+" but realLength = 
"+realLength+" on "+this;
                Logger.error(this, "Illegal ERROR: "+err, new 
Exception("error"));
-               Logger.error(this, "Details:\n"+sb.toString());
                throw new IllegalStateException(err);
        } else {
                Logger.minor(this, "checkList() successful: realLength = 
statedLength = "+realLength+" on "+this);
-               Logger.minor(this, "Details:\n"+sb.toString());
        }
        }

-       public synchronized void remove(UpdatableSortedLinkedListItem i) {
+       public synchronized void remove(UpdatableSortedLinkedListItem i) throws 
UpdatableSortedLinkedListKilledException {
+       if(killed) throw new UpdatableSortedLinkedListKilledException();
         Logger.minor(this, "Remove("+i+") on "+this);
         checkList();
         list.remove(i);
         checkList();
     }

-    public synchronized void update(UpdatableSortedLinkedListItem i) {
+    public synchronized void update(UpdatableSortedLinkedListItem i) throws 
UpdatableSortedLinkedListKilledException {
+       if(killed) throw new UpdatableSortedLinkedListKilledException();
         Logger.minor(this, "Update("+i+") on "+this);
         checkList();
         if(i.compareTo(list.tail()) > 0) {
@@ -160,8 +161,10 @@

     /**
      * Dump the current status of the list to the log.
+     * @throws UpdatableSortedLinkedListKilledException 
      */
-    private synchronized void dump() {
+    private synchronized void dump() throws 
UpdatableSortedLinkedListKilledException {
+       if(killed) throw new UpdatableSortedLinkedListKilledException();
         for(Enumeration e=list.elements();e.hasMoreElements();) {
             UpdatableSortedLinkedListItem item = 
(UpdatableSortedLinkedListItem) e.nextElement();
             Logger.minor(this, item.toString());
@@ -177,8 +180,10 @@

     /**
      * @return an array, in order, of the elements in the list
+     * @throws UpdatableSortedLinkedListKilledException 
      */
-    public synchronized UpdatableSortedLinkedListItem[] toArray() {
+    public synchronized UpdatableSortedLinkedListItem[] toArray() throws 
UpdatableSortedLinkedListKilledException {
+       if(killed) throw new UpdatableSortedLinkedListKilledException();
         int size = list.size();
         if(size < 0)
                throw new IllegalStateException("list.size() = "+size+" for 
"+this);
@@ -210,4 +215,9 @@
     public synchronized void clear() {
         list.clear();
     }
+    
+    public synchronized void kill() {
+       clear();
+       killed = true;
+    }
 }

Added: 
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java
===================================================================
--- 
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java 
    2005-11-12 22:23:39 UTC (rev 7542)
+++ 
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListKilledException.java 
    2005-11-15 21:10:36 UTC (rev 7543)
@@ -0,0 +1,5 @@
+package freenet.support;
+
+public class UpdatableSortedLinkedListKilledException extends Exception {
+
+}

Modified: 
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
===================================================================
--- 
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
    2005-11-12 22:23:39 UTC (rev 7542)
+++ 
trunk/freenet/src/freenet/support/UpdatableSortedLinkedListWithForeignIndex.java
    2005-11-15 21:10:36 UTC (rev 7543)
@@ -18,7 +18,7 @@
         map = new HashMap();
     }

-    public synchronized void add(UpdatableSortedLinkedListItem item) {
+    public synchronized void add(UpdatableSortedLinkedListItem item) throws 
UpdatableSortedLinkedListKilledException {
         if(!(item instanceof IndexableUpdatableSortedLinkedListItem)) {
             throw new IllegalArgumentException();
         }
@@ -33,10 +33,9 @@
         checkList();
     }

-    public synchronized void remove(UpdatableSortedLinkedListItem item) {
+    public synchronized void remove(UpdatableSortedLinkedListItem item) throws 
UpdatableSortedLinkedListKilledException {
         super.remove(item);
         
map.remove(((IndexableUpdatableSortedLinkedListItem)item).indexValue());
-        checkList();
     }

     public synchronized boolean containsKey(Object o) {
@@ -45,8 +44,9 @@

     /**
      * Remove an element from the list by its key.
+     * @throws UpdatableSortedLinkedListKilledException 
      */
-    public synchronized void removeByKey(Object key) {
+    public synchronized void removeByKey(Object key) throws 
UpdatableSortedLinkedListKilledException {
         IndexableUpdatableSortedLinkedListItem item = 
             (IndexableUpdatableSortedLinkedListItem) map.get(key);
         if(item != null) remove(item);


Reply via email to