Author: toad
Date: 2008-12-13 20:18:41 +0000 (Sat, 13 Dec 2008)
New Revision: 24297

Modified:
   trunk/freenet/src/freenet/io/comm/PeerContext.java
   trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
   trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
   trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
   trunk/freenet/src/freenet/node/FailureTable.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/SSKInsertSender.java
Log:
Add a callback to sendThrottledPacket(), wait for all packets in flight to be 
sent before failing with a timeout in a bulk send.


Modified: trunk/freenet/src/freenet/io/comm/PeerContext.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-12-13 19:41:21 UTC 
(rev 24296)
+++ trunk/freenet/src/freenet/io/comm/PeerContext.java  2008-12-13 20:18:41 UTC 
(rev 24297)
@@ -38,7 +38,7 @@
        
        /** Send a throttled message to the node (may block for a long time). 
         * @throws SyncSendWaitedTooLongException */
-       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout, boolean waitForSent) throws 
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException;
+       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout, boolean waitForSent, AsyncMessageCallback 
callback) throws NotConnectedException, WaitedTooLongException, 
SyncSendWaitedTooLongException;
        
        /** Get the current boot ID. This is a random number that changes every 
time the node starts up. */
        public long getBootID();

Modified: trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-12-13 
19:41:21 UTC (rev 24296)
+++ trunk/freenet/src/freenet/io/xfer/BlockTransmitter.java     2008-12-13 
20:18:41 UTC (rev 24297)
@@ -98,7 +98,7 @@
                                        }
                                        int totalPackets;
                                        try {
-                                               
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT, 
false);
+                                               
_destination.sendThrottledMessage(DMT.createPacketTransmit(_uid, packetNo, 
_sentPackets, _prb.getPacket(packetNo)), _prb._packetSize, _ctr, SEND_TIMEOUT, 
false, null);
                                                
totalPackets=_prb.getNumPackets();
                                        } catch (NotConnectedException e) {
                                                Logger.normal(this, 
"Terminating send: "+e);

Modified: trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-12-13 
19:41:21 UTC (rev 24296)
+++ trunk/freenet/src/freenet/io/xfer/BulkTransmitter.java      2008-12-13 
20:18:41 UTC (rev 24297)
@@ -3,6 +3,7 @@
  * http://www.gnu.org/ for further details of the GPL. */
 package freenet.io.xfer;
 
+import freenet.io.comm.AsyncMessageCallback;
 import freenet.io.comm.AsyncMessageFilterCallback;
 import freenet.io.comm.ByteCounter;
 import freenet.io.comm.DMT;
@@ -217,8 +218,24 @@
                                        completed();
                                        return true;
                                }
-                               // Wait for a packet, BulkReceivedAll or 
BulkReceiveAborted
                                synchronized(this) {
+                                       // Wait for all packets to complete
+                                       while(true) {
+                                               if(failedPacket) {
+                                                       cancel("Packet send 
failed");
+                                                       return false;
+                                               }
+                                               
if(Logger.shouldLog(Logger.MINOR, this))
+                                                       Logger.minor(this, 
"Waiting for packets: remaining: "+inFlightPackets);
+                                               if(inFlightPackets == 0) break;
+                                               try {
+                                                       wait();
+                                               } catch (InterruptedException 
e) {
+                                                       // Ignore
+                                               }
+                                       }
+                                       
+                                       // Wait for a packet to come in, 
BulkReceivedAll or BulkReceiveAborted
                                        try {
                                                wait(60*1000);
                                        } catch (InterruptedException e) {
@@ -229,7 +246,7 @@
                                long end = System.currentTimeMillis();
                                if(end - lastSentPacket > TIMEOUT) {
                                        Logger.error(this, "Send timed out on 
"+this);
-                                       cancel("Timeout");
+                                       cancel("Timeout awaiting 
BulkReceivedAll");
                                        return false;
                                }
                                continue;
@@ -245,7 +262,8 @@
                        
                        // Congestion control and bandwidth limiting
                        try {
-                               
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf), 
buf.length, ctr, BulkReceiver.TIMEOUT, false);
+                               if(logMINOR) Logger.minor(this, "Sending packet 
"+blockNo);
+                               
peer.sendThrottledMessage(DMT.createFNPBulkPacketSend(uid, blockNo, buf), 
buf.length, ctr, BulkReceiver.TIMEOUT, false, new UnsentPacketTag());
                                synchronized(this) {
                                        blocksNotSentButPresent.setBit(blockNo, 
false);
                                }
@@ -266,6 +284,56 @@
                }
        }
        
+       private int inFlightPackets = 0;
+       private boolean failedPacket = false;
+       
+       private class UnsentPacketTag implements AsyncMessageCallback {
+
+               private boolean finished;
+               
+               private UnsentPacketTag() {
+                       synchronized(BulkTransmitter.this) {
+                               inFlightPackets++;
+                       }
+               }
+               
+               public void acknowledged() {
+                       complete(false);
+               }
+
+               private void complete(boolean failed) {
+                       synchronized(this) {
+                               if(finished) return;
+                               finished = true;
+                       }
+                       synchronized(BulkTransmitter.this) {
+                               if(failed) {
+                                       failedPacket = true;
+                                       BulkTransmitter.this.notifyAll();
+                                       if(Logger.shouldLog(Logger.MINOR, 
this)) Logger.minor(this, "Packet failed for "+BulkTransmitter.this);
+                               } else {
+                                       inFlightPackets--;
+                                       if(inFlightPackets <= 0)
+                                               
BulkTransmitter.this.notifyAll();
+                                       if(Logger.shouldLog(Logger.MINOR, 
this)) Logger.minor(this, "Packet sent "+BulkTransmitter.this+" remaining in 
flight: "+inFlightPackets);
+                               }
+                       }
+               }
+
+               public void disconnected() {
+                       complete(true);
+               }
+
+               public void fatalError() {
+                       complete(true);
+               }
+
+               public void sent() {
+                       // Wait for acknowledgment
+               }
+               
+       }
+       
        @Override
        public String toString() {
                return "BulkTransmitter:"+uid+":"+peer.shortToString();

Modified: trunk/freenet/src/freenet/io/xfer/PacketThrottle.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-12-13 
19:41:21 UTC (rev 24296)
+++ trunk/freenet/src/freenet/io/xfer/PacketThrottle.java       2008-12-13 
20:18:41 UTC (rev 24297)
@@ -146,7 +146,7 @@
                return ((PACKET_SIZE * 1000.0 / getDelay()));
        }
        
-       public void sendThrottledMessage(Message msg, PeerContext peer, int 
packetSize, ByteCounter ctr, long deadline, boolean blockForSend) throws 
NotConnectedException, ThrottleDeprecatedException, WaitedTooLongException, 
SyncSendWaitedTooLongException {
+       public void sendThrottledMessage(Message msg, PeerContext peer, int 
packetSize, ByteCounter ctr, long deadline, boolean blockForSend, 
AsyncMessageCallback cbForAsyncSend) throws NotConnectedException, 
ThrottleDeprecatedException, WaitedTooLongException, 
SyncSendWaitedTooLongException {
                long start = System.currentTimeMillis();
                long bootID = peer.getBootID();
                synchronized(this) {
@@ -226,7 +226,7 @@
                        Logger.error(this, "Congestion control wait time: 
"+waitTime+" for "+this);
                else if(logMINOR)
                        Logger.minor(this, "Congestion control wait time: 
"+waitTime+" for "+this);
-               MyCallback callback = new MyCallback();
+               MyCallback callback = new MyCallback(cbForAsyncSend);
                try {
                        peer.sendAsync(msg, callback, ctr);
                        ctr.sentPayload(packetSize);
@@ -266,6 +266,12 @@
 
                private boolean finished = false;
                
+               private AsyncMessageCallback chainCallback;
+               
+               public MyCallback(AsyncMessageCallback cbForAsyncSend) {
+                       this.chainCallback = cbForAsyncSend;
+               }
+
                public void acknowledged() {
                        synchronized(PacketThrottle.this) {
                                if(finished) {
@@ -277,6 +283,7 @@
                                PacketThrottle.this.notifyAll();
                        }
                        if(logMINOR) Logger.minor(this, "Removed packet: acked 
for "+this);
+                       if(chainCallback != null) chainCallback.acknowledged();
                }
 
                public void disconnected() {
@@ -287,6 +294,7 @@
                                PacketThrottle.this.notifyAll();
                        }
                        if(logMINOR) Logger.minor(this, "Removed packet: 
disconnected for "+this);
+                       if(chainCallback != null) chainCallback.disconnected();
                }
 
                public void fatalError() {
@@ -297,6 +305,7 @@
                                PacketThrottle.this.notifyAll();
                        }
                        if(logMINOR) Logger.minor(this, "Removed packet: error 
for "+this);
+                       if(chainCallback != null) chainCallback.fatalError();
                }
 
                public void sent() {

Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java    2008-12-13 19:41:21 UTC 
(rev 24296)
+++ trunk/freenet/src/freenet/node/FailureTable.java    2008-12-13 20:18:41 UTC 
(rev 24297)
@@ -424,7 +424,7 @@
 
                                public void run() {
                                        try {
-                                               
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000, false);
+                                               
source.sendThrottledMessage(data, dataLength, senderCounter, 60*1000, false, 
null);
                                        } catch (NotConnectedException e) {
                                                // :(
                                        } catch (WaitedTooLongException e) {

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java        2008-12-13 19:41:21 UTC 
(rev 24296)
+++ trunk/freenet/src/freenet/node/PeerNode.java        2008-12-13 20:18:41 UTC 
(rev 24297)
@@ -4044,12 +4044,12 @@
                return resendBytesSent;
        }
        
-       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout, boolean blockForSend) throws 
NotConnectedException, WaitedTooLongException, SyncSendWaitedTooLongException {
+       public void sendThrottledMessage(Message msg, int packetSize, 
ByteCounter ctr, int timeout, boolean blockForSend, AsyncMessageCallback 
callback) throws NotConnectedException, WaitedTooLongException, 
SyncSendWaitedTooLongException {
                long deadline = System.currentTimeMillis() + timeout;
                if(logMINOR) Logger.minor(this, "Sending throttled message with 
timeout "+timeout+" packet size "+packetSize+" to "+shortToString());
                for(int i=0;i<100;i++) {
                        try {
-                               getThrottle().sendThrottledMessage(msg, this, 
packetSize, ctr, deadline, blockForSend);
+                               getThrottle().sendThrottledMessage(msg, this, 
packetSize, ctr, deadline, blockForSend, callback);
                                return;
                        } catch (ThrottleDeprecatedException e) {
                                // Try with the new throttle. We don't need it, 
we'll get it from getThrottle().
@@ -4058,6 +4058,7 @@
                }
                Logger.error(this, "Peer constantly changes its IP address!!: 
"+shortToString());
                forceDisconnect(true);
+               throw new NotConnectedException();
        }
 
        /**

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java  2008-12-13 19:41:21 UTC 
(rev 24296)
+++ trunk/freenet/src/freenet/node/RequestHandler.java  2008-12-13 20:18:41 UTC 
(rev 24297)
@@ -349,7 +349,7 @@
 
                        public void run() {
                                try {
-                                       source.sendThrottledMessage(dataMsg, 
data.length, RequestHandler.this, 60 * 1000, true);
+                                       source.sendThrottledMessage(dataMsg, 
data.length, RequestHandler.this, 60 * 1000, true, null);
                                        applyByteCounts();
                                } catch(NotConnectedException e) {
                                        // Okay
@@ -383,7 +383,7 @@
                source.sendAsync(headersMsg, null, ctr);
                final Message dataMsg = DMT.createFNPSSKDataFoundData(uid, 
data);
                try {
-                       source.sendThrottledMessage(dataMsg, data.length, ctr, 
60 * 1000, false);
+                       source.sendThrottledMessage(dataMsg, data.length, ctr, 
60 * 1000, false, null);
                } catch(SyncSendWaitedTooLongException e) {
                        // Impossible
                        throw new Error(e);

Modified: trunk/freenet/src/freenet/node/SSKInsertSender.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-12-13 19:41:21 UTC 
(rev 24296)
+++ trunk/freenet/src/freenet/node/SSKInsertSender.java 2008-12-13 20:18:41 UTC 
(rev 24297)
@@ -249,7 +249,7 @@
             
             try {
                                next.sendAsync(headersMsg, null, this);
-                               next.sendThrottledMessage(dataMsg, data.length, 
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false);
+                               next.sendThrottledMessage(dataMsg, data.length, 
this, SSKInsertHandler.DATA_INSERT_TIMEOUT, false, null);
                        } catch (NotConnectedException e1) {
                                if(logMINOR) Logger.minor(this, "Not connected 
to "+next);
                                continue;

_______________________________________________
cvs mailing list
[email protected]
http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to