Author: toad
Date: 2007-06-01 11:34:51 +0000 (Fri, 01 Jun 2007)
New Revision: 13435

Modified:
   trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
   trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
Log:
Completed receiving code.

Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:15:19 UTC 
(rev 13434)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java 2007-06-01 11:34:51 UTC 
(rev 13435)
@@ -4,8 +4,13 @@
 package freenet.io.xfer;

 import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
+import freenet.io.comm.RetrievalException;
+import freenet.support.ShortBuffer;

 /**
  * Bulk (not block) data transfer - receiver class. Bulk transfer is designed 
for largish files, much
@@ -13,7 +18,8 @@
  * @author toad
  */
 public class BulkReceiver {
-       
+
+       static final int TIMEOUT = 5*60*1000;
        /** Tracks the data we have received */
        final PartiallyReceivedBulk prb;
        /** Peer we are receiving from */
@@ -21,11 +27,14 @@
        /** Transfer UID for messages */
        final long uid;
        private boolean sentCancel;
+       /** Not persistent over reboots */
+       final long peerBootID;

        public BulkReceiver(PartiallyReceivedBulk prb, PeerContext peer, long 
uid) {
                this.prb = prb;
                this.peer = peer;
                this.uid = uid;
+               this.peerBootID = peer.getBootID();
        }

        public void onAborted() {
@@ -40,4 +49,39 @@
                }
        }

+       /**
+        * Receive the file.
+        * @return True if the whole file was received, false otherwise.
+        */
+       public boolean receive() {
+               MessageFilter mfSendKilled = 
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkSendAborted) 
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
+               MessageFilter mfPacket = 
MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend) 
.setField(DMT.UID, uid).setTimeout(TIMEOUT);
+               while(true) {
+                       if(prb.hasWholeFile()) return true;
+                       Message m;
+                       try {
+                               m = prb.usm.waitFor(mfSendKilled.or(mfPacket), 
null);
+                       } catch (DisconnectedException e) {
+                               
prb.abort(RetrievalException.SENDER_DISCONNECTED, "Sender disconnected");
+                               return false;
+                       }
+                       if(peer.getBootID() != peerBootID) {
+                               prb.abort(RetrievalException.SENDER_DIED, 
"Sender restarted");
+                               return false;
+                       }
+                       if(m == null) {
+                               prb.abort(RetrievalException.TIMED_OUT, "Sender 
timeout");
+                               return false;
+                       }
+                       if(m.getSpec() == DMT.FNPBulkSendAborted) {
+                               prb.abort(RetrievalException.SENDER_DIED, 
"Sender cancelled send");
+                               return false;
+                       }
+                       if(m.getSpec() == DMT.FNPBulkPacketSend) {
+                               int packetNo = m.getInt(DMT.PACKET_NO);
+                               byte[] data = ((ShortBuffer) 
m.getObject(DMT.DATA)).getData();
+                               prb.received(packetNo, data, 0, data.length);
+                       }
+               }
+       }
 }

Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-06-01 11:15:19 UTC (rev 13434)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java        
2007-06-01 11:34:51 UTC (rev 13435)
@@ -96,10 +96,18 @@
         * Called when a block has been received. Will copy the data from the 
provided buffer and store it.
         * @param blockNum The block number.
         * @param data The byte array from which to read the data.
-        * @param offset The start of the 
+        * @param offset The start of the data in the buffer.
         */
-       void received(int blockNum, byte[] data, int offset) {
+       void received(int blockNum, byte[] data, int offset, int length) {
                BulkTransmitter[] notifyBTs;
+               long fileOffset = (long)blockNum * (long)blockSize;
+               int bs = (int) Math.max(blockSize, size - fileOffset);
+               if(length < bs) {
+                       String err = "Data too short! Should be "+bs+" actually 
"+length;
+                       Logger.error(this, err+" for "+this);
+                       abort(RetrievalException.PREMATURE_EOF, err);
+                       return;
+               }
                synchronized(this) {
                        if(blocksReceived.bitAt(blockNum)) return; // ignore
                        blocksReceived.setBit(blockNum, true); // assume the 
rest of the function succeeds
@@ -107,8 +115,6 @@
                        notifyBTs = transmitters;
                }
                try {
-                       long fileOffset = (long)blockNum * (long)blockSize;
-                       int bs = (int) Math.max(blockSize, size - fileOffset);
                        raf.pwrite(fileOffset, data, offset, bs);
                } catch (Throwable t) {
                        Logger.error(this, "Failed to store received block 
"+blockNum+" on "+this+" : "+t, t);


Reply via email to