Author: toad
Date: 2007-05-30 22:57:52 +0000 (Wed, 30 May 2007)
New Revision: 13405

Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/io/comm/PeerContext.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/support/BitArray.java
Log:
More work on bulk transmission.
Just need to code the receiver, and wire it in, and test it...

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java  2007-05-30 22:54:39 UTC (rev 
13404)
+++ trunk/freenet/src/freenet/io/comm/DMT.java  2007-05-30 22:57:52 UTC (rev 
13405)
@@ -184,6 +184,10 @@
                        BitArray.serializedLength(_packets) + 4 /* Message 
header */;
        }

+       public static int bulkPacketTransmitSize(int size) {
+               return size + 8 /* uid */ + 4 /* packet# */ + 4 /* Message 
hader */;
+       }
+       
        public static final MessageType allSent = new MessageType("allSent") {{
                addField(UID, Long.class);
        }};

Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java  2007-05-30 22:54:39 UTC 
(rev 13404)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java  2007-05-30 22:57:52 UTC 
(rev 13405)
@@ -3,6 +3,8 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.io.comm;

+import freenet.io.xfer.PacketThrottle;
+
 /**
  * @author amphibian
  * 
@@ -27,4 +29,11 @@

        /** Send a message to the node */
        public void sendAsync(Message msg, AsyncMessageCallback cb, int 
alreadyReportedBytes, ByteCounter ctr) throws NotConnectedException;
+       
+       /** Get the current boot ID. This is a random number that changes every 
time the node starts up. */
+       public long getBootID();
+
+       /** Get the PacketThrottle for the node's current address for the 
standard packet size (if the 
+        * address changes then we get a new throttle). */ 
+       public PacketThrottle getThrottle();
 }

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2007-05-30 
22:54:39 UTC (rev 13404)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2007-05-30 
22:57:52 UTC (rev 13405)
@@ -75,7 +75,7 @@
                        Logger.error(this, "Aborted during setup");
                        // Will throw on running
                }
-               throttle = PacketThrottle.getThrottle(_destination.getPeer(), 
_prb._packetSize);
+               throttle = _destination.getThrottle();
                _senderThread = new Thread("_senderThread for "+_uid+ " to 
"+_destination.getPeer()) {

                        public void run() {

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-05-30 
22:54:39 UTC (rev 13404)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2007-05-30 
22:57:52 UTC (rev 13405)
@@ -7,6 +7,7 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
 import freenet.support.BitArray;
+import freenet.support.DoubleTokenBucket;

 /**
  * Bulk data transfer (not block). Bulk transfer is designed for files which 
may be much bigger than a 
@@ -25,11 +26,19 @@
        final long uid;
        /** Blocks we have but haven't sent yet. 0 = block sent or not present, 
1 = block present but not sent */
        final BitArray blocksNotSentButPresent;
+       private boolean cancelled;
+       /** Not persistent over reboots */
+       final long peerBootID;
+       /** The overall hard bandwidth limiter */
+       final DoubleTokenBucket masterThrottle;
+       

-       public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, 
long uid) {
+       public BulkTransmitter(PartiallyReceivedBulk prb, PeerContext peer, 
long uid, DoubleTokenBucket masterThrottle) {
                this.prb = prb;
                this.peer = peer;
                this.uid = uid;
+               this.masterThrottle = masterThrottle;
+               peerBootID = peer.getBootID();
                // Need to sync on prb while doing both operations, to avoid 
race condition.
                // Specifically, we must not get calls to blockReceived() until 
blocksNotSentButPresent
                // has been set, AND it must be accurate, so there must not be 
an unlocked period
@@ -48,6 +57,7 @@
         */
        synchronized void blockReceived(int block) {
                blocksNotSentButPresent.setBit(block, true);
+               notifyAll();
        }

        /**
@@ -59,6 +69,88 @@
                } catch (NotConnectedException e) {
                        // Cool
                }
+               synchronized(this) {
+                       notifyAll();
+               }
        }

+       public void cancel() {
+               try {
+                       peer.sendAsync(DMT.createFNPBulkSendAborted(uid), null, 
0, null);
+               } catch (NotConnectedException e) {
+                       // Cool
+               }
+               synchronized(this) {
+                       cancelled = true;
+                       notifyAll();
+               }
+               prb.remove(this);
+       }
+       
+       class Sender implements Runnable {
+
+               public void run() {
+                       while(true) {
+                               if(prb.isAborted()) return;
+                               int blockNo;
+                               if(peer.getBootID() != peerBootID) {
+                                       synchronized(this) {
+                                               cancelled = true;
+                                               notifyAll();
+                                       }
+                                       prb.remove(BulkTransmitter.this);
+                                       return;
+                               }
+                               boolean hasAll = prb.hasWholeFile();
+                               synchronized(this) {
+                                       if(cancelled) return;
+                                       blockNo = 
blocksNotSentButPresent.firstOne();
+                               }
+                               if(blockNo < 0 && hasAll) {
+                                       prb.remove(BulkTransmitter.this);
+                                       return; // All done
+                               }
+                               else if(blockNo < 0) {
+                                       synchronized(this) {
+                                               try {
+                                                       wait(60*1000);
+                                               } catch (InterruptedException 
e) {
+                                                       // No problem
+                                               }
+                                               continue;
+                                       }
+                               }
+                               // Send a packet
+                               byte[] buf = prb.getBlockData(blockNo);
+                               if(buf == null) {
+                                       // Already cancelled, quit
+                                       return;
+                               }
+                               
+                               // Congestion control and bandwidth limiting
+                               long now = System.currentTimeMillis();
+                               long waitUntil = 
peer.getThrottle().scheduleDelay(now);
+                               
+                               
masterThrottle.blockingGrab(prb.getPacketSize());
+                               
+                               while((now = System.currentTimeMillis()) < 
waitUntil) {
+                                       long sleepTime = waitUntil - now;
+                                       try {
+                                               Thread.sleep(sleepTime);
+                                       } catch (InterruptedException e) {
+                                               // Ignore
+                                       }
+                               }
+                               // FIXME should this be reported on 
bwlimitDelayTime ???
+                               
+                               try {
+                                       
peer.sendAsync(DMT.createFNPBulkPacketSend(uid, blockNo, buf), null, 0, null);
+                               } catch (NotConnectedException e) {
+                                       cancel();
+                                       return;
+                               }
+                       }
+               }
+               
+       }
 }

Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-05-30 22:54:39 UTC (rev 13404)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-05-30 22:57:52 UTC (rev 13405)
@@ -3,7 +3,11 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.io.xfer;

+import java.io.IOException;
+
+import freenet.io.comm.DMT;
 import freenet.io.comm.RetrievalException;
+import freenet.node.FNPPacketMangler;
 import freenet.support.BitArray;
 import freenet.support.Logger;
 import freenet.support.io.RandomAccessThing;
@@ -28,6 +32,8 @@
        private BulkTransmitter[] transmitters;
        /** The one and only BulkReceiver */
        private BulkReceiver recv;
+       private int blocksReceivedCount;
+       final int packetSize;
        // Abort status
        boolean _aborted;
        int _abortReason;
@@ -50,8 +56,12 @@
                        throw new IllegalArgumentException("Too big");
                this.blocks = (int)blocks;
                blocksReceived = new BitArray(this.blocks);
-               if(initialState)
+               if(initialState) {
                        blocksReceived.setAllOnes();
+                       blocksReceivedCount = this.blocks;
+               }
+               packetSize = DMT.bulkPacketTransmitSize(blockSize) + 
+                       FNPPacketMangler.FULL_HEADERS_LENGTH_ONE_MESSAGE; // 
FIXME generalise
        }

        /**
@@ -90,6 +100,7 @@
                synchronized(this) {
                        if(blocksReceived.bitAt(blockNum)) return; // ignore
                        blocksReceived.setBit(blockNum, true); // assume the 
rest of the function succeeds
+                       blocksReceivedCount++;
                        notifyBTs = transmitters;
                }
                try {
@@ -126,21 +137,40 @@
                        notifyBR.onAborted();
        }

-//     /**
-//      * Fail the transfer because of an unrecoverable exception e.g. an 
error in storing the data.
-//      * @param t The throwable causing this failure.
-//      */
-//     private void fail(Throwable t) {
-//             Logger.error(this, "Failing transfer: "+this+" : "+t, t);
-//             BulkTransmitter[] notifyBTs;
-//             synchronized(this) {
-//                     notifyBTs = transmitters;
-//             }
-//             if(notifyBTs != null) {
-//                     for(int i=0;i<notifyBTs.length;i++)
-//                             notifyBTs[i].fail(t);
-//             }
-//             if(recv != null)
-//                     recv.fail(t);
-//     }
+       public synchronized boolean isAborted() {
+               return _aborted;
+       }
+
+       public int getPacketSize() {
+               return packetSize;
+       }
+
+       public boolean hasWholeFile() {
+               return blocksReceivedCount >= blocks;
+       }
+
+       public byte[] getBlockData(int blockNum) {
+               long fileOffset = (long)blockNum * (long)blockSize;
+               int bs = (int) Math.max(blockSize, size - fileOffset);
+               byte[] data = new byte[bs];
+               try {
+                       raf.pread(fileOffset, data, 0, bs);
+               } catch (IOException e) {
+                       Logger.error(this, "Failed to read stored block 
"+blockNum+" on "+this+" : "+e, e);
+                       abort(RetrievalException.IO_ERROR, e.toString());
+                       return null;
+               }
+               return data;
+       }
+
+       public synchronized void remove(BulkTransmitter remove) {
+               BulkTransmitter[] newTrans = new 
BulkTransmitter[transmitters.length-1];
+               int j = 0;
+               for(int i=0;i<transmitters.length;i++) {
+                       BulkTransmitter t = transmitters[i];
+                       if(t == remove) continue;
+                       newTrans[j++] = t;
+               }
+               transmitters = newTrans;
+       }
 }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2007-05-30 22:54:39 UTC 
(rev 13404)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2007-05-30 22:57:52 UTC 
(rev 13405)
@@ -1572,6 +1572,10 @@
                return true;
     }

+    public long getBootID() {
+       return bootID;
+    }
+    
     private volatile Object arkFetcherSync = new Object();

     void startARKFetcher() {

Modified: trunk/freenet/src/freenet/support/BitArray.java
===================================================================
--- trunk/freenet/src/freenet/support/BitArray.java     2007-05-30 22:54:39 UTC 
(rev 13404)
+++ trunk/freenet/src/freenet/support/BitArray.java     2007-05-30 22:57:52 UTC 
(rev 13405)
@@ -116,4 +116,17 @@
                for(int i=0;i<_bits.length;i++)
                        _bits[i] = (byte)0xFF;
        }
+
+       public int firstOne() {
+               for(int i=0;i<_bits.length;i++) {
+                       byte b = _bits[i];
+                       if(b == 0) continue;
+                       for(int j=0;j<8;j++) {
+                               int mask = (1 << j);
+                               if((b & mask) != 0)
+                                       return i*8+j;
+                       }
+               }
+               return -1;
+       }
 }
\ No newline at end of file


Reply via email to